diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java index 7d5370b85e..1a50523955 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.io.buffer; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Timer; import java.util.TimerTask; @@ -37,6 +36,9 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; public final class TimedBuffer { // Constants ----------------------------------------------------- + // The number of tries on sleep before switching to spin + private static final int MAX_CHECKS_ON_SLEEP = 20; + // Attributes ---------------------------------------------------- private TimedBufferObserver bufferObserver; @@ -46,7 +48,7 @@ public final class TimedBuffer { // prevent that private final Semaphore spinLimiter = new Semaphore(1); - private CheckTimer timerRunnable = new CheckTimer(); + private CheckTimer timerRunnable = null; private final int bufferSize; @@ -58,7 +60,8 @@ public final class TimedBuffer { private final int timeout; - private final AtomicLong pendingSyncs = new AtomicLong(); + // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen + private volatile boolean pendingSync = false; private Thread timerThread; @@ -73,7 +76,7 @@ public final class TimedBuffer { private final boolean logRates; - private long bytesFlushed = 0; + private final AtomicLong bytesFlushed = new AtomicLong(0); private final AtomicLong flushesDone = new AtomicLong(0); @@ -81,6 +84,9 @@ public final class TimedBuffer { private TimerTask logRatesTimerTask; + //used only in the timerThread do not synchronization + private boolean useSleep = true; + // no need to be volatile as every access is synchronized private boolean spinning = false; @@ -99,16 +105,18 @@ public final class TimedBuffer { logRatesTimer = new Timer(true); } // Setting the interval for nano-sleeps + //prefer off heap buffer to allow further humongous allocations and reduce GC overhead //NOTE: it is used ByteBuffer::allocateDirect instead of Unpooled::directBuffer, because the latter could allocate //direct ByteBuffers with no Cleaner! buffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(ByteBuffer.allocateDirect(size))); + buffer.clear(); bufferLimit = 0; - callbacks = null; + callbacks = new ArrayList<>(); this.timeout = timeout; } @@ -232,14 +240,11 @@ public final class TimedBuffer { buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes); buffer.writerIndex(writerIndex + readableBytes); - if (callbacks == null) { - callbacks = new ArrayList<>(); - } callbacks.add(callback); if (sync) { - final long currentPendingSyncs = pendingSyncs.get(); - pendingSyncs.lazySet(currentPendingSyncs + 1); + pendingSync = true; + startSpin(); } } @@ -253,14 +258,11 @@ public final class TimedBuffer { bytes.encode(buffer); - if (callbacks == null) { - callbacks = new ArrayList<>(); - } callbacks.add(callback); if (sync) { - final long currentPendingSyncs = pendingSyncs.get(); - pendingSyncs.lazySet(currentPendingSyncs + 1); + pendingSync = true; + startSpin(); } @@ -274,14 +276,18 @@ public final class TimedBuffer { * force means the Journal is moving to a new file. Any pending write need to be done immediately * or data could be lost */ - private void flush(final boolean force) { + public void flush(final boolean force) { synchronized (this) { if (!started) { throw new IllegalStateException("TimedBuffer is not started"); } if ((force || !delayFlush) && buffer.writerIndex() > 0) { - final int pos = buffer.writerIndex(); + int pos = buffer.writerIndex(); + + if (logRates) { + bytesFlushed.addAndGet(pos); + } final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos); //bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!! @@ -289,34 +295,25 @@ public final class TimedBuffer { //perform memcpy under the hood due to the off heap buffer buffer.getBytes(0, bufferToFlush); - final List ioCallbacks = callbacks == null ? Collections.emptyList() : callbacks; - bufferObserver.flushBuffer(bufferToFlush, pendingSyncs.get() > 0, ioCallbacks); + + bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks); stopSpin(); - pendingSyncs.lazySet(0); + pendingSync = false; - callbacks = null; + // swap the instance as the previous callback list is being used asynchronously + callbacks = new ArrayList<>(); buffer.clear(); bufferLimit = 0; - if (logRates) { - logFlushed(pos); - } + flushesDone.incrementAndGet(); } } } - private void logFlushed(int bytes) { - this.bytesFlushed += bytes; - //more lightweight than XADD if single writer - final long currentFlushesDone = flushesDone.get(); - //flushesDone::lazySet write-Release bytesFlushed - flushesDone.lazySet(currentFlushesDone + 1L); - } - // Package protected --------------------------------------------- // Protected ----------------------------------------------------- @@ -340,21 +337,21 @@ public final class TimedBuffer { if (!closed) { long now = System.currentTimeMillis(); - final long flushesDone = TimedBuffer.this.flushesDone.get(); - //flushesDone::get read-Acquire bytesFlushed - final long bytesFlushed = TimedBuffer.this.bytesFlushed; + long bytesF = bytesFlushed.get(); + long flushesD = flushesDone.get(); + if (lastExecution != 0) { - final double rate = 1000 * (double) (bytesFlushed - lastBytesFlushed) / (now - lastExecution); + double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution); ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024))); - final double flushRate = 1000 * (double) (flushesDone - lastFlushesDone) / (now - lastExecution); + double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution); ActiveMQJournalLogger.LOGGER.flushRate(flushRate); } lastExecution = now; - lastBytesFlushed = bytesFlushed; + lastBytesFlushed = bytesF; - lastFlushesDone = flushesDone; + lastFlushesDone = flushesD; } } @@ -370,40 +367,74 @@ public final class TimedBuffer { private volatile boolean closed = false; + int checks = 0; + int failedChecks = 0; + long timeBefore = 0; + @Override public void run() { - int waitTimes = 0; long lastFlushTime = 0; - long estimatedOptimalBatch = Runtime.getRuntime().availableProcessors(); - final Semaphore spinLimiter = TimedBuffer.this.spinLimiter; - final long timeout = TimedBuffer.this.timeout; while (!closed) { - boolean flushed = false; - final long currentPendingSyncs = pendingSyncs.get(); + // We flush on the timer if there are pending syncs there and we've waited at least one + // timeout since the time of the last flush. + // Effectively flushing "resets" the timer + // On the timeout verification, notice that we ignore the timeout check if we are using sleep - if (currentPendingSyncs > 0) { - if (bufferObserver != null) { - final boolean checkpoint = System.nanoTime() > lastFlushTime + timeout; - if (checkpoint || currentPendingSyncs >= estimatedOptimalBatch) { - flush(); - if (checkpoint) { - estimatedOptimalBatch = currentPendingSyncs; - } else { - estimatedOptimalBatch = Math.max(estimatedOptimalBatch, currentPendingSyncs); - } - lastFlushTime = System.nanoTime(); - //a flush has been requested - flushed = true; - } + if (pendingSync) { + if (useSleep) { + // if using sleep, we will always flush + flush(); + lastFlushTime = System.nanoTime(); + } else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) { + // if not using flush we will spin and do the time checks manually + flush(); + lastFlushTime = System.nanoTime(); } + } - if (flushed) { - waitTimes = 0; - } else { - //instead of interruptible sleeping, perform progressive parks depending on the load - waitTimes = TimedBuffer.wait(waitTimes, spinLimiter); + sleepIfPossible(); + + try { + spinLimiter.acquire(); + + Thread.yield(); + + spinLimiter.release(); + } catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); + } + } + } + + /** + * We will attempt to use sleep only if the system supports nano-sleep + * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well. + * if more than 50% of the checks have failed we will cancel the sleep and just use regular spin + */ + private void sleepIfPossible() { + if (useSleep) { + if (checks < MAX_CHECKS_ON_SLEEP) { + timeBefore = System.nanoTime(); + } + + LockSupport.parkNanos(timeout); + + if (checks < MAX_CHECKS_ON_SLEEP) { + long realTimeSleep = System.nanoTime() - timeBefore; + + // I'm letting the real time to be up to 50% than the requested sleep. + if (realTimeSleep > timeout * 1.5) { + failedChecks++; + } + + if (++checks >= MAX_CHECKS_ON_SLEEP) { + if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) { + ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts"); + useSleep = false; + } + } } } } @@ -413,35 +444,6 @@ public final class TimedBuffer { } } - private static int wait(int waitTimes, Semaphore spinLimiter) { - if (waitTimes < 10) { - //doesn't make sense to spin loop here, because of the lock around flush/addBytes operations! - Thread.yield(); - waitTimes++; - } else if (waitTimes < 20) { - LockSupport.parkNanos(1L); - waitTimes++; - } else if (waitTimes < 50) { - LockSupport.parkNanos(10L); - waitTimes++; - } else if (waitTimes < 100) { - LockSupport.parkNanos(100L); - waitTimes++; - } else if (waitTimes < 1000) { - LockSupport.parkNanos(1000L); - waitTimes++; - } else { - LockSupport.parkNanos(100_000L); - try { - spinLimiter.acquire(); - spinLimiter.release(); - } catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } - } - return waitTimes; - } - /** * Sub classes (tests basically) can use this to override disabling spinning */