diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java index 1c2c0ebc02..6a9a0dd3c9 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java @@ -147,6 +147,8 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer { logger.warn(e.getMessage(), e); } } + + actions.clear(); } @Override diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java index c1c56024fb..07d5a3fdc5 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java @@ -25,6 +25,10 @@ public class CriticalComponentImpl implements CriticalComponent { private final CriticalMeasure[] measures; private final CriticalAnalyzer analyzer; + public CriticalAnalyzer getCriticalAnalyzer() { + return analyzer; + } + public CriticalComponentImpl(CriticalAnalyzer analyzer, int numberOfPaths) { if (analyzer == null) { analyzer = EmptyCriticalAnalyzer.getInstance(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java index 050737347e..b9ea6a830f 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java @@ -33,6 +33,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; +import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; import org.jboss.logging.Logger; /** @@ -74,11 +76,17 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac final int bufferTimeout, final int maxIO, final boolean logRates, - final IOCriticalErrorListener criticalErrorListener) { + final IOCriticalErrorListener criticalErrorListener, + CriticalAnalyzer criticalAnalyzer) { this.journalDir = journalDir; + if (criticalAnalyzer == null) { + criticalAnalyzer = EmptyCriticalAnalyzer.getInstance(); + } + if (buffered && bufferTimeout > 0) { - timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, logRates); + timedBuffer = new TimedBuffer(criticalAnalyzer, bufferSize, bufferTimeout, logRates); + criticalAnalyzer.add(timedBuffer); } else { timedBuffer = null; } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index e8cc97ef2d..d8288e6ca0 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioFile; import org.apache.activemq.artemis.jlibaio.SubmitInfo; import org.apache.activemq.artemis.jlibaio.util.CallbackCache; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.jboss.logging.Logger; public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory { @@ -56,11 +57,11 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor private static final String AIO_TEST_FILE = ".aio-test"; public AIOSequentialFileFactory(final File journalDir, int maxIO) { - this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, null); + this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, null, null); } public AIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, int maxIO) { - this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, listener); + this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, listener, null); } public AIOSequentialFileFactory(final File journalDir, @@ -68,7 +69,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor final int bufferTimeout, final int maxIO, final boolean logRates) { - this(journalDir, bufferSize, bufferTimeout, maxIO, logRates, null); + this(journalDir, bufferSize, bufferTimeout, maxIO, logRates, null, null); } public AIOSequentialFileFactory(final File journalDir, @@ -76,8 +77,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor final int bufferTimeout, final int maxIO, final boolean logRates, - final IOCriticalErrorListener listener) { - super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener); + final IOCriticalErrorListener listener, + final CriticalAnalyzer analyzer) { + super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer); callbackPool = new CallbackCache<>(maxIO); if (logger.isTraceEnabled()) { logger.trace("New AIO File Created"); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java index f469bf8d5f..e0fe149eb7 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java @@ -32,9 +32,14 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; +import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl; import org.jboss.logging.Logger; -public final class TimedBuffer { +public final class TimedBuffer extends CriticalComponentImpl { + + protected static final int CRITICAL_PATHS = 1; + protected static final int CRITICAL_PATH_FLUSH = 0; private static final Logger logger = Logger.getLogger(TimedBuffer.class); @@ -99,7 +104,8 @@ public final class TimedBuffer { // Public -------------------------------------------------------- - public TimedBuffer(final int size, final int timeout, final boolean logRates) { + public TimedBuffer(CriticalAnalyzer analyzer, final int size, final int timeout, final boolean logRates) { + super(analyzer, CRITICAL_PATHS); bufferSize = size; this.logRates = logRates; @@ -286,38 +292,42 @@ public final class TimedBuffer { throw new IllegalStateException("TimedBuffer is not started"); } - if (!delayFlush && buffer.writerIndex() > 0) { - int pos = buffer.writerIndex(); + enterCritical(CRITICAL_PATH_FLUSH); + try { + if (!delayFlush && buffer.writerIndex() > 0) { + int pos = buffer.writerIndex(); - if (logRates) { - bytesFlushed.addAndGet(pos); + if (logRates) { + bytesFlushed.addAndGet(pos); + } + + final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos); + //bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!! + bufferToFlush.limit(pos); + //perform memcpy under the hood due to the off heap buffer + buffer.getBytes(0, bufferToFlush); + + bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks); + + stopSpin(); + + pendingSync = false; + + // swap the instance as the previous callback list is being used asynchronously + callbacks = new ArrayList<>(); + + buffer.clear(); + + bufferLimit = 0; + + flushesDone.incrementAndGet(); + + return pos > 0; + } else { + return false; } - - final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos); - //bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!! - bufferToFlush.limit(pos); - //perform memcpy under the hood due to the off heap buffer - buffer.getBytes(0, bufferToFlush); - - - bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks); - - stopSpin(); - - pendingSync = false; - - // swap the instance as the previous callback list is being used asynchronously - callbacks = new ArrayList<>(); - - buffer.clear(); - - bufferLimit = 0; - - flushesDone.incrementAndGet(); - - return pos > 0; - } else { - return false; + } finally { + leaveCritical(CRITICAL_PATH_FLUSH); } } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java index 2be2ff27c9..2cdaba1fa5 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java @@ -40,7 +40,9 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac final int bufferTimeout, IOCriticalErrorListener criticalErrorListener) { - super(directory, buffered, bufferSize, bufferTimeout, 1, false, criticalErrorListener); + // at the moment we only use the critical analyzer on the timed buffer + // MappedSequentialFile is not using any buffering, hence we just pass in null + super(directory, buffered, bufferSize, bufferTimeout, 1, false, criticalErrorListener, null); this.capacity = capacity; this.setDatasync(true); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java index 781176e1f2..b585b24667 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.utils.Env; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; public final class NIOSequentialFileFactory extends AbstractSequentialFileFactory { @@ -42,7 +43,7 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor } public NIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, final int maxIO) { - this(journalDir, false, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener); + this(journalDir, false, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener, null); } public NIOSequentialFileFactory(final File journalDir, final boolean buffered, final int maxIO) { @@ -53,7 +54,7 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor final boolean buffered, final IOCriticalErrorListener listener, final int maxIO) { - this(journalDir, buffered, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener); + this(journalDir, buffered, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener, null); } public NIOSequentialFileFactory(final File journalDir, @@ -62,7 +63,7 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor final int bufferTimeout, final int maxIO, final boolean logRates) { - this(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, null); + this(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, null, null); } public NIOSequentialFileFactory(final File journalDir, @@ -71,8 +72,9 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor final int bufferTimeout, final int maxIO, final boolean logRates, - final IOCriticalErrorListener listener) { - super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener); + final IOCriticalErrorListener listener, + final CriticalAnalyzer analyzer) { + super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer); this.bufferPooling = true; this.bytesPool = new ThreadLocal<>(); } diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java index b0096b7f9d..45625850da 100644 --- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java @@ -66,10 +66,10 @@ public class JournalTptBenchmark { .setDatasync(dataSync); break; case Nio: - factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync); + factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null, null).setDatasync(dataSync); break; case Aio: - factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync); + factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null, null).setDatasync(dataSync); //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse(); if (!LibaioContext.isLoaded()) { throw new IllegalStateException("lib AIO not loaded!"); diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java index ed14ae426f..8ba0ccc5e0 100644 --- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java @@ -59,10 +59,10 @@ public class SequentialFileTptBenchmark { factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync); break; case Nio: - factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync); + factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null, null).setDatasync(dataSync); break; case Aio: - factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync); + factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null, null).setDatasync(dataSync); //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse(); if (!LibaioContext.isLoaded()) { throw new IllegalStateException("lib AIO not loaded!"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 84adde4e6c..160d12daa8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -133,11 +133,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager { case NIO: ActiveMQServerLogger.LOGGER.journalUseNIO(); - journalFF = new NIOSequentialFileFactory(config.getJournalLocation(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener); + journalFF = new NIOSequentialFileFactory(config.getJournalLocation(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener, getCriticalAnalyzer()); break; case ASYNCIO: ActiveMQServerLogger.LOGGER.journalUseAIO(); - journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener); + journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener, getCriticalAnalyzer()); break; case MAPPED: ActiveMQServerLogger.LOGGER.journalUseMAPPED(); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java index 2462ee7fb6..3619db2719 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java @@ -87,7 +87,7 @@ public class TimedBufferTest extends ActiveMQTestBase { } } - TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS, false); + TimedBuffer timedBuffer = new TimedBuffer(null, 100, TimedBufferTest.ONE_SECOND_IN_NANOS, false); timedBuffer.start(); @@ -155,7 +155,7 @@ public class TimedBufferTest extends ActiveMQTestBase { } } - TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 2, false); + TimedBuffer timedBuffer = new TimedBuffer(null, 100, TimedBufferTest.ONE_SECOND_IN_NANOS / 2, false); timedBuffer.start(); @@ -393,7 +393,7 @@ public class TimedBufferTest extends ActiveMQTestBase { //it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match final long deviceTime = timeout; final int bufferSize = Env.osPageSize(); - final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) timeout, false); + final TimedBuffer timedBuffer = new TimedBuffer(null, bufferSize, (int) timeout, false); timedBuffer.start(); try (NonBlockingObserver observer = new NonBlockingObserver(bufferSize, deviceTime)) { timedBuffer.setObserver(observer); @@ -434,7 +434,7 @@ public class TimedBufferTest extends ActiveMQTestBase { //it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match final long deviceTime = timeout; final int bufferSize = Env.osPageSize(); - final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) timeout, false); + final TimedBuffer timedBuffer = new TimedBuffer(null, bufferSize, (int) timeout, false); timedBuffer.start(); try (BlockingObserver observer = new BlockingObserver(bufferSize, deviceTime)) { timedBuffer.setObserver(observer); @@ -489,7 +489,7 @@ public class TimedBufferTest extends ActiveMQTestBase { } } - TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 10, false); + TimedBuffer timedBuffer = new TimedBuffer(null, 100, TimedBufferTest.ONE_SECOND_IN_NANOS / 10, false); timedBuffer.start();