ARTEMIS-1489 Adding Timed Buffer into Critical Analyzer

This commit is contained in:
Clebert Suconic 2017-10-30 12:10:59 -04:00 committed by Justin Bertram
parent 5a476a189d
commit 2bf690e21b
11 changed files with 86 additions and 56 deletions

View File

@ -147,6 +147,8 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer {
logger.warn(e.getMessage(), e); logger.warn(e.getMessage(), e);
} }
} }
actions.clear();
} }
@Override @Override

View File

@ -25,6 +25,10 @@ public class CriticalComponentImpl implements CriticalComponent {
private final CriticalMeasure[] measures; private final CriticalMeasure[] measures;
private final CriticalAnalyzer analyzer; private final CriticalAnalyzer analyzer;
public CriticalAnalyzer getCriticalAnalyzer() {
return analyzer;
}
public CriticalComponentImpl(CriticalAnalyzer analyzer, int numberOfPaths) { public CriticalComponentImpl(CriticalAnalyzer analyzer, int numberOfPaths) {
if (analyzer == null) { if (analyzer == null) {
analyzer = EmptyCriticalAnalyzer.getInstance(); analyzer = EmptyCriticalAnalyzer.getInstance();

View File

@ -33,6 +33,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
/** /**
@ -74,11 +76,17 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
final int bufferTimeout, final int bufferTimeout,
final int maxIO, final int maxIO,
final boolean logRates, final boolean logRates,
final IOCriticalErrorListener criticalErrorListener) { final IOCriticalErrorListener criticalErrorListener,
CriticalAnalyzer criticalAnalyzer) {
this.journalDir = journalDir; this.journalDir = journalDir;
if (criticalAnalyzer == null) {
criticalAnalyzer = EmptyCriticalAnalyzer.getInstance();
}
if (buffered && bufferTimeout > 0) { if (buffered && bufferTimeout > 0) {
timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, logRates); timedBuffer = new TimedBuffer(criticalAnalyzer, bufferSize, bufferTimeout, logRates);
criticalAnalyzer.add(timedBuffer);
} else { } else {
timedBuffer = null; timedBuffer = null;
} }

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioFile;
import org.apache.activemq.artemis.jlibaio.SubmitInfo; import org.apache.activemq.artemis.jlibaio.SubmitInfo;
import org.apache.activemq.artemis.jlibaio.util.CallbackCache; import org.apache.activemq.artemis.jlibaio.util.CallbackCache;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory { public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory {
@ -56,11 +57,11 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
private static final String AIO_TEST_FILE = ".aio-test"; private static final String AIO_TEST_FILE = ".aio-test";
public AIOSequentialFileFactory(final File journalDir, int maxIO) { public AIOSequentialFileFactory(final File journalDir, int maxIO) {
this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, null); this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, null, null);
} }
public AIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, int maxIO) { public AIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, int maxIO) {
this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, listener); this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, listener, null);
} }
public AIOSequentialFileFactory(final File journalDir, public AIOSequentialFileFactory(final File journalDir,
@ -68,7 +69,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
final int bufferTimeout, final int bufferTimeout,
final int maxIO, final int maxIO,
final boolean logRates) { final boolean logRates) {
this(journalDir, bufferSize, bufferTimeout, maxIO, logRates, null); this(journalDir, bufferSize, bufferTimeout, maxIO, logRates, null, null);
} }
public AIOSequentialFileFactory(final File journalDir, public AIOSequentialFileFactory(final File journalDir,
@ -76,8 +77,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
final int bufferTimeout, final int bufferTimeout,
final int maxIO, final int maxIO,
final boolean logRates, final boolean logRates,
final IOCriticalErrorListener listener) { final IOCriticalErrorListener listener,
super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener); final CriticalAnalyzer analyzer) {
super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer);
callbackPool = new CallbackCache<>(maxIO); callbackPool = new CallbackCache<>(maxIO);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("New AIO File Created"); logger.trace("New AIO File Created");

View File

@ -32,9 +32,14 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
public final class TimedBuffer { public final class TimedBuffer extends CriticalComponentImpl {
protected static final int CRITICAL_PATHS = 1;
protected static final int CRITICAL_PATH_FLUSH = 0;
private static final Logger logger = Logger.getLogger(TimedBuffer.class); private static final Logger logger = Logger.getLogger(TimedBuffer.class);
@ -99,7 +104,8 @@ public final class TimedBuffer {
// Public -------------------------------------------------------- // Public --------------------------------------------------------
public TimedBuffer(final int size, final int timeout, final boolean logRates) { public TimedBuffer(CriticalAnalyzer analyzer, final int size, final int timeout, final boolean logRates) {
super(analyzer, CRITICAL_PATHS);
bufferSize = size; bufferSize = size;
this.logRates = logRates; this.logRates = logRates;
@ -286,38 +292,42 @@ public final class TimedBuffer {
throw new IllegalStateException("TimedBuffer is not started"); throw new IllegalStateException("TimedBuffer is not started");
} }
if (!delayFlush && buffer.writerIndex() > 0) { enterCritical(CRITICAL_PATH_FLUSH);
int pos = buffer.writerIndex(); try {
if (!delayFlush && buffer.writerIndex() > 0) {
int pos = buffer.writerIndex();
if (logRates) { if (logRates) {
bytesFlushed.addAndGet(pos); bytesFlushed.addAndGet(pos);
}
final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
//bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!!
bufferToFlush.limit(pos);
//perform memcpy under the hood due to the off heap buffer
buffer.getBytes(0, bufferToFlush);
bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
stopSpin();
pendingSync = false;
// swap the instance as the previous callback list is being used asynchronously
callbacks = new ArrayList<>();
buffer.clear();
bufferLimit = 0;
flushesDone.incrementAndGet();
return pos > 0;
} else {
return false;
} }
} finally {
final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos); leaveCritical(CRITICAL_PATH_FLUSH);
//bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!!
bufferToFlush.limit(pos);
//perform memcpy under the hood due to the off heap buffer
buffer.getBytes(0, bufferToFlush);
bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
stopSpin();
pendingSync = false;
// swap the instance as the previous callback list is being used asynchronously
callbacks = new ArrayList<>();
buffer.clear();
bufferLimit = 0;
flushesDone.incrementAndGet();
return pos > 0;
} else {
return false;
} }
} }
} }

View File

@ -40,7 +40,9 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac
final int bufferTimeout, final int bufferTimeout,
IOCriticalErrorListener criticalErrorListener) { IOCriticalErrorListener criticalErrorListener) {
super(directory, buffered, bufferSize, bufferTimeout, 1, false, criticalErrorListener); // at the moment we only use the critical analyzer on the timed buffer
// MappedSequentialFile is not using any buffering, hence we just pass in null
super(directory, buffered, bufferSize, bufferTimeout, 1, false, criticalErrorListener, null);
this.capacity = capacity; this.capacity = capacity;
this.setDatasync(true); this.setDatasync(true);

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
public final class NIOSequentialFileFactory extends AbstractSequentialFileFactory { public final class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
@ -42,7 +43,7 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor
} }
public NIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, final int maxIO) { public NIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, final int maxIO) {
this(journalDir, false, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener); this(journalDir, false, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener, null);
} }
public NIOSequentialFileFactory(final File journalDir, final boolean buffered, final int maxIO) { public NIOSequentialFileFactory(final File journalDir, final boolean buffered, final int maxIO) {
@ -53,7 +54,7 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor
final boolean buffered, final boolean buffered,
final IOCriticalErrorListener listener, final IOCriticalErrorListener listener,
final int maxIO) { final int maxIO) {
this(journalDir, buffered, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener); this(journalDir, buffered, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener, null);
} }
public NIOSequentialFileFactory(final File journalDir, public NIOSequentialFileFactory(final File journalDir,
@ -62,7 +63,7 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor
final int bufferTimeout, final int bufferTimeout,
final int maxIO, final int maxIO,
final boolean logRates) { final boolean logRates) {
this(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, null); this(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, null, null);
} }
public NIOSequentialFileFactory(final File journalDir, public NIOSequentialFileFactory(final File journalDir,
@ -71,8 +72,9 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor
final int bufferTimeout, final int bufferTimeout,
final int maxIO, final int maxIO,
final boolean logRates, final boolean logRates,
final IOCriticalErrorListener listener) { final IOCriticalErrorListener listener,
super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener); final CriticalAnalyzer analyzer) {
super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer);
this.bufferPooling = true; this.bufferPooling = true;
this.bytesPool = new ThreadLocal<>(); this.bytesPool = new ThreadLocal<>();
} }

View File

@ -66,10 +66,10 @@ public class JournalTptBenchmark {
.setDatasync(dataSync); .setDatasync(dataSync);
break; break;
case Nio: case Nio:
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync); factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null, null).setDatasync(dataSync);
break; break;
case Aio: case Aio:
factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync); factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null, null).setDatasync(dataSync);
//disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse(); //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
if (!LibaioContext.isLoaded()) { if (!LibaioContext.isLoaded()) {
throw new IllegalStateException("lib AIO not loaded!"); throw new IllegalStateException("lib AIO not loaded!");

View File

@ -59,10 +59,10 @@ public class SequentialFileTptBenchmark {
factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync); factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync);
break; break;
case Nio: case Nio:
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync); factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null, null).setDatasync(dataSync);
break; break;
case Aio: case Aio:
factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync); factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null, null).setDatasync(dataSync);
//disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse(); //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
if (!LibaioContext.isLoaded()) { if (!LibaioContext.isLoaded()) {
throw new IllegalStateException("lib AIO not loaded!"); throw new IllegalStateException("lib AIO not loaded!");

View File

@ -133,11 +133,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
case NIO: case NIO:
ActiveMQServerLogger.LOGGER.journalUseNIO(); ActiveMQServerLogger.LOGGER.journalUseNIO();
journalFF = new NIOSequentialFileFactory(config.getJournalLocation(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener); journalFF = new NIOSequentialFileFactory(config.getJournalLocation(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener, getCriticalAnalyzer());
break; break;
case ASYNCIO: case ASYNCIO:
ActiveMQServerLogger.LOGGER.journalUseAIO(); ActiveMQServerLogger.LOGGER.journalUseAIO();
journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener); journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener, getCriticalAnalyzer());
break; break;
case MAPPED: case MAPPED:
ActiveMQServerLogger.LOGGER.journalUseMAPPED(); ActiveMQServerLogger.LOGGER.journalUseMAPPED();

View File

@ -87,7 +87,7 @@ public class TimedBufferTest extends ActiveMQTestBase {
} }
} }
TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS, false); TimedBuffer timedBuffer = new TimedBuffer(null, 100, TimedBufferTest.ONE_SECOND_IN_NANOS, false);
timedBuffer.start(); timedBuffer.start();
@ -155,7 +155,7 @@ public class TimedBufferTest extends ActiveMQTestBase {
} }
} }
TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 2, false); TimedBuffer timedBuffer = new TimedBuffer(null, 100, TimedBufferTest.ONE_SECOND_IN_NANOS / 2, false);
timedBuffer.start(); timedBuffer.start();
@ -393,7 +393,7 @@ public class TimedBufferTest extends ActiveMQTestBase {
//it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match //it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match
final long deviceTime = timeout; final long deviceTime = timeout;
final int bufferSize = Env.osPageSize(); final int bufferSize = Env.osPageSize();
final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) timeout, false); final TimedBuffer timedBuffer = new TimedBuffer(null, bufferSize, (int) timeout, false);
timedBuffer.start(); timedBuffer.start();
try (NonBlockingObserver observer = new NonBlockingObserver(bufferSize, deviceTime)) { try (NonBlockingObserver observer = new NonBlockingObserver(bufferSize, deviceTime)) {
timedBuffer.setObserver(observer); timedBuffer.setObserver(observer);
@ -434,7 +434,7 @@ public class TimedBufferTest extends ActiveMQTestBase {
//it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match //it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match
final long deviceTime = timeout; final long deviceTime = timeout;
final int bufferSize = Env.osPageSize(); final int bufferSize = Env.osPageSize();
final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) timeout, false); final TimedBuffer timedBuffer = new TimedBuffer(null, bufferSize, (int) timeout, false);
timedBuffer.start(); timedBuffer.start();
try (BlockingObserver observer = new BlockingObserver(bufferSize, deviceTime)) { try (BlockingObserver observer = new BlockingObserver(bufferSize, deviceTime)) {
timedBuffer.setObserver(observer); timedBuffer.setObserver(observer);
@ -489,7 +489,7 @@ public class TimedBufferTest extends ActiveMQTestBase {
} }
} }
TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 10, false); TimedBuffer timedBuffer = new TimedBuffer(null, 100, TimedBufferTest.ONE_SECOND_IN_NANOS / 10, false);
timedBuffer.start(); timedBuffer.start();