From 520a40b1a1431fb0fdc1666c556342410a56e4eb Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Fri, 8 Sep 2017 15:00:35 +0100 Subject: [PATCH] ARTEMIS-1418 AIO Shutdown on IOError and logging --- .../artemis/core/io/AbstractSequentialFile.java | 7 +++++++ .../core/io/AbstractSequentialFileFactory.java | 5 +++++ .../artemis/core/io/aio/AIOSequentialFile.java | 17 +++++++++++++++++ .../core/io/aio/AIOSequentialFileFactory.java | 12 +++++++++++- .../core/server/files/FileStoreMonitor.java | 8 +++++++- .../core/server/impl/ActiveMQServerImpl.java | 2 +- .../core/server/files/FileStoreMonitorTest.java | 4 ++-- 7 files changed, 50 insertions(+), 5 deletions(-) diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java index f6cb9b0a26..32168fc65a 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java @@ -35,9 +35,12 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.jboss.logging.Logger; public abstract class AbstractSequentialFile implements SequentialFile { + private static final Logger logger = Logger.getLogger(AbstractSequentialFile.class); + private File file; protected final File directory; @@ -267,6 +270,10 @@ public abstract class AbstractSequentialFile implements SequentialFile { @Override public void onError(final int errorCode, final String errorMessage) { + if (logger.isTraceEnabled()) { + logger.trace("onError" + " code: " + errorCode + " message: " + errorMessage); + } + final int size = delegates.size(); for (int i = 0; i < size; i++) { try { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java index 4310e84df2..c6657df22e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java @@ -33,12 +33,15 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.jboss.logging.Logger; /** * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories */ public abstract class AbstractSequentialFileFactory implements SequentialFileFactory { + private static final Logger logger = Logger.getLogger(AbstractSequentialFileFactory.class); + // Timeout used to wait executors to shutdown protected static final int EXECUTOR_TIMEOUT = 60; @@ -161,6 +164,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac public void onIOError(Exception exception, String message, SequentialFile file) { if (critialErrorListener != null) { critialErrorListener.onIOException(exception, message, file); + } else { + logger.warn("Critical IO Error Called. No Critical IO Error Handler Registered"); } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index 9d3a824eb1..6ca55387c5 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -36,9 +36,12 @@ import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.jlibaio.LibaioFile; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ReusableLatch; +import org.jboss.logging.Logger; public class AIOSequentialFile extends AbstractSequentialFile { + private static final Logger logger = Logger.getLogger(AIOSequentialFileFactory.class); + private boolean opened = false; private LibaioFile aioFile; @@ -134,6 +137,10 @@ public class AIOSequentialFile extends AbstractSequentialFile { @Override public synchronized void fill(final int size) throws Exception { + if (logger.isTraceEnabled()) { + logger.trace("Filling file: " + getFileName()); + } + checkOpened(); aioFile.fill(size); @@ -149,9 +156,14 @@ public class AIOSequentialFile extends AbstractSequentialFile { public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException { opened = true; + if (logger.isTraceEnabled()) { + logger.trace("Opening file: " + getFileName()); + } + try { aioFile = aioFactory.libaioContext.openFile(getFile(), factory.isDatasync()); } catch (IOException e) { + logger.error("Error opening file: " + getFileName()); factory.onIOError(e, e.getMessage(), this); throw new ActiveMQNativeIOError(e.getMessage(), e); } @@ -176,6 +188,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { // Sending it through the callback would make it released aioFile.read(positionToRead, bytesToRead, bytes, getCallback(callback, null)); } catch (IOException e) { + logger.error("IOError reading file: " + getFileName(), e); factory.onIOError(e, e.getMessage(), this); throw new ActiveMQNativeIOError(e.getMessage(), e); } @@ -196,6 +209,10 @@ public class AIOSequentialFile extends AbstractSequentialFile { @Override public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception { + if (logger.isTraceEnabled()) { + logger.trace("Write Direct, Sync: " + sync + " File: " + getFileName()); + } + if (sync) { SimpleWaitIOCallback completion = new SimpleWaitIOCallback(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index 51d960a320..df71c160d9 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.ArtemisConstants; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; import org.apache.activemq.artemis.core.io.IOCallback; @@ -77,6 +79,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor final IOCriticalErrorListener listener) { super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener); callbackPool = new CallbackCache<>(maxIO); + if (logger.isTraceEnabled()) { + logger.trace("New AIO File Created"); + } } public AIOSequentialCallback getCallback() { @@ -304,7 +309,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor try { libaioFile.write(position, bytes, buffer, this); } catch (IOException e) { - callback.onError(-1, e.getMessage()); + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); onIOError(e, "Failed to write to file", sequentialFile); } } @@ -337,6 +342,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor @Override public void onError(int errno, String message) { + if (logger.isDebugEnabled()) { + logger.trace("AIO on error issued. Error(code: " + errno + " msg: " + message + ")"); + } this.error = true; this.errorCode = errno; this.errorMessage = message; @@ -357,6 +365,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor if (error) { callback.onError(errorCode, errorMessage); + onIOError(new ActiveMQException(errorCode, errorMessage), errorMessage, null); errorMessage = null; } else { if (callback != null) { @@ -385,6 +394,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor libaioContext.poll(); } catch (Throwable e) { ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e); + onIOError(new ActiveMQException("Error on libaio poll"), e.getMessage(), null); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java index 06006879a1..8cd7fef5a0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitor.java @@ -27,6 +27,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.jboss.logging.Logger; @@ -45,14 +46,17 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent { private final Set stores = new HashSet<>(); private double maxUsage; private final Object monitorLock = new Object(); + private final IOCriticalErrorListener ioCriticalErrorListener; public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService, Executor executor, long checkPeriod, TimeUnit timeUnit, - double maxUsage) { + double maxUsage, + IOCriticalErrorListener ioCriticalErrorListener) { super(scheduledExecutorService, executor, checkPeriod, timeUnit, false); this.maxUsage = maxUsage; + this.ioCriticalErrorListener = ioCriticalErrorListener; } public FileStoreMonitor addCallback(Callback callback) { @@ -99,6 +103,8 @@ public class FileStoreMonitor extends ActiveMQScheduledComponent { if (over) { break; } + } catch (IOException ioe) { + ioCriticalErrorListener.onIOException(ioe, "IO Error while calculating disk usage", null); } catch (Exception e) { logger.warn(e.getMessage(), e); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 68e555936e..311ed4f489 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2413,7 +2413,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } try { - injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f)); + injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, getCriticalIOErrorListener())); } catch (Exception e) { logger.warn(e.getMessage(), e); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java index bc4017c92f..b91d3de260 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileStoreMonitorTest.java @@ -96,7 +96,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase { }; final AtomicBoolean fakeReturn = new AtomicBoolean(false); - FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999) { + FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 100, TimeUnit.MILLISECONDS, 0.999, null) { @Override protected double calculateUsage(FileStore store) throws IOException { if (fakeReturn.get()) { @@ -127,7 +127,7 @@ public class FileStoreMonitorTest extends ActiveMQTestBase { @Test public void testScheduler() throws Exception { - FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9); + FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, executorService, 20, TimeUnit.MILLISECONDS, 0.9, null); final ReusableLatch latch = new ReusableLatch(5); storeMonitor.addStore(getTestDirfile());