ARTEMIS-1294 Reverted TimedBuffer timeout policy

This commit is contained in:
Francesco Nigro 2017-07-17 17:35:38 +02:00 committed by Clebert Suconic
parent 8f500986a0
commit 3dc9566fb6
1 changed files with 95 additions and 93 deletions

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.io.buffer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
@ -37,6 +36,9 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public final class TimedBuffer { public final class TimedBuffer {
// Constants ----------------------------------------------------- // Constants -----------------------------------------------------
// The number of tries on sleep before switching to spin
private static final int MAX_CHECKS_ON_SLEEP = 20;
// Attributes ---------------------------------------------------- // Attributes ----------------------------------------------------
private TimedBufferObserver bufferObserver; private TimedBufferObserver bufferObserver;
@ -46,7 +48,7 @@ public final class TimedBuffer {
// prevent that // prevent that
private final Semaphore spinLimiter = new Semaphore(1); private final Semaphore spinLimiter = new Semaphore(1);
private CheckTimer timerRunnable = new CheckTimer(); private CheckTimer timerRunnable = null;
private final int bufferSize; private final int bufferSize;
@ -58,7 +60,8 @@ public final class TimedBuffer {
private final int timeout; private final int timeout;
private final AtomicLong pendingSyncs = new AtomicLong(); // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
private volatile boolean pendingSync = false;
private Thread timerThread; private Thread timerThread;
@ -73,7 +76,7 @@ public final class TimedBuffer {
private final boolean logRates; private final boolean logRates;
private long bytesFlushed = 0; private final AtomicLong bytesFlushed = new AtomicLong(0);
private final AtomicLong flushesDone = new AtomicLong(0); private final AtomicLong flushesDone = new AtomicLong(0);
@ -81,6 +84,9 @@ public final class TimedBuffer {
private TimerTask logRatesTimerTask; private TimerTask logRatesTimerTask;
//used only in the timerThread do not synchronization
private boolean useSleep = true;
// no need to be volatile as every access is synchronized // no need to be volatile as every access is synchronized
private boolean spinning = false; private boolean spinning = false;
@ -99,16 +105,18 @@ public final class TimedBuffer {
logRatesTimer = new Timer(true); logRatesTimer = new Timer(true);
} }
// Setting the interval for nano-sleeps // Setting the interval for nano-sleeps
//prefer off heap buffer to allow further humongous allocations and reduce GC overhead //prefer off heap buffer to allow further humongous allocations and reduce GC overhead
//NOTE: it is used ByteBuffer::allocateDirect instead of Unpooled::directBuffer, because the latter could allocate //NOTE: it is used ByteBuffer::allocateDirect instead of Unpooled::directBuffer, because the latter could allocate
//direct ByteBuffers with no Cleaner! //direct ByteBuffers with no Cleaner!
buffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(ByteBuffer.allocateDirect(size))); buffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(ByteBuffer.allocateDirect(size)));
buffer.clear(); buffer.clear();
bufferLimit = 0; bufferLimit = 0;
callbacks = null; callbacks = new ArrayList<>();
this.timeout = timeout; this.timeout = timeout;
} }
@ -232,14 +240,11 @@ public final class TimedBuffer {
buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes); buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes);
buffer.writerIndex(writerIndex + readableBytes); buffer.writerIndex(writerIndex + readableBytes);
if (callbacks == null) {
callbacks = new ArrayList<>();
}
callbacks.add(callback); callbacks.add(callback);
if (sync) { if (sync) {
final long currentPendingSyncs = pendingSyncs.get(); pendingSync = true;
pendingSyncs.lazySet(currentPendingSyncs + 1);
startSpin(); startSpin();
} }
} }
@ -253,14 +258,11 @@ public final class TimedBuffer {
bytes.encode(buffer); bytes.encode(buffer);
if (callbacks == null) {
callbacks = new ArrayList<>();
}
callbacks.add(callback); callbacks.add(callback);
if (sync) { if (sync) {
final long currentPendingSyncs = pendingSyncs.get(); pendingSync = true;
pendingSyncs.lazySet(currentPendingSyncs + 1);
startSpin(); startSpin();
} }
@ -274,14 +276,18 @@ public final class TimedBuffer {
* force means the Journal is moving to a new file. Any pending write need to be done immediately * force means the Journal is moving to a new file. Any pending write need to be done immediately
* or data could be lost * or data could be lost
*/ */
private void flush(final boolean force) { public void flush(final boolean force) {
synchronized (this) { synchronized (this) {
if (!started) { if (!started) {
throw new IllegalStateException("TimedBuffer is not started"); throw new IllegalStateException("TimedBuffer is not started");
} }
if ((force || !delayFlush) && buffer.writerIndex() > 0) { if ((force || !delayFlush) && buffer.writerIndex() > 0) {
final int pos = buffer.writerIndex(); int pos = buffer.writerIndex();
if (logRates) {
bytesFlushed.addAndGet(pos);
}
final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos); final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
//bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!! //bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!!
@ -289,33 +295,24 @@ public final class TimedBuffer {
//perform memcpy under the hood due to the off heap buffer //perform memcpy under the hood due to the off heap buffer
buffer.getBytes(0, bufferToFlush); buffer.getBytes(0, bufferToFlush);
final List<IOCallback> ioCallbacks = callbacks == null ? Collections.emptyList() : callbacks;
bufferObserver.flushBuffer(bufferToFlush, pendingSyncs.get() > 0, ioCallbacks); bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
stopSpin(); stopSpin();
pendingSyncs.lazySet(0); pendingSync = false;
callbacks = null; // swap the instance as the previous callback list is being used asynchronously
callbacks = new ArrayList<>();
buffer.clear(); buffer.clear();
bufferLimit = 0; bufferLimit = 0;
if (logRates) { flushesDone.incrementAndGet();
logFlushed(pos);
} }
} }
} }
}
private void logFlushed(int bytes) {
this.bytesFlushed += bytes;
//more lightweight than XADD if single writer
final long currentFlushesDone = flushesDone.get();
//flushesDone::lazySet write-Release bytesFlushed
flushesDone.lazySet(currentFlushesDone + 1L);
}
// Package protected --------------------------------------------- // Package protected ---------------------------------------------
@ -340,21 +337,21 @@ public final class TimedBuffer {
if (!closed) { if (!closed) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
final long flushesDone = TimedBuffer.this.flushesDone.get(); long bytesF = bytesFlushed.get();
//flushesDone::get read-Acquire bytesFlushed long flushesD = flushesDone.get();
final long bytesFlushed = TimedBuffer.this.bytesFlushed;
if (lastExecution != 0) { if (lastExecution != 0) {
final double rate = 1000 * (double) (bytesFlushed - lastBytesFlushed) / (now - lastExecution); double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution);
ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024))); ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024)));
final double flushRate = 1000 * (double) (flushesDone - lastFlushesDone) / (now - lastExecution); double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution);
ActiveMQJournalLogger.LOGGER.flushRate(flushRate); ActiveMQJournalLogger.LOGGER.flushRate(flushRate);
} }
lastExecution = now; lastExecution = now;
lastBytesFlushed = bytesFlushed; lastBytesFlushed = bytesF;
lastFlushesDone = flushesDone; lastFlushesDone = flushesD;
} }
} }
@ -370,40 +367,74 @@ public final class TimedBuffer {
private volatile boolean closed = false; private volatile boolean closed = false;
int checks = 0;
int failedChecks = 0;
long timeBefore = 0;
@Override @Override
public void run() { public void run() {
int waitTimes = 0;
long lastFlushTime = 0; long lastFlushTime = 0;
long estimatedOptimalBatch = Runtime.getRuntime().availableProcessors();
final Semaphore spinLimiter = TimedBuffer.this.spinLimiter;
final long timeout = TimedBuffer.this.timeout;
while (!closed) { while (!closed) {
boolean flushed = false; // We flush on the timer if there are pending syncs there and we've waited at least one
final long currentPendingSyncs = pendingSyncs.get(); // timeout since the time of the last flush.
// Effectively flushing "resets" the timer
// On the timeout verification, notice that we ignore the timeout check if we are using sleep
if (currentPendingSyncs > 0) { if (pendingSync) {
if (bufferObserver != null) { if (useSleep) {
final boolean checkpoint = System.nanoTime() > lastFlushTime + timeout; // if using sleep, we will always flush
if (checkpoint || currentPendingSyncs >= estimatedOptimalBatch) {
flush(); flush();
if (checkpoint) {
estimatedOptimalBatch = currentPendingSyncs;
} else {
estimatedOptimalBatch = Math.max(estimatedOptimalBatch, currentPendingSyncs);
}
lastFlushTime = System.nanoTime(); lastFlushTime = System.nanoTime();
//a flush has been requested } else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) {
flushed = true; // if not using flush we will spin and do the time checks manually
flush();
lastFlushTime = System.nanoTime();
}
}
sleepIfPossible();
try {
spinLimiter.acquire();
Thread.yield();
spinLimiter.release();
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
} }
} }
} }
if (flushed) { /**
waitTimes = 0; * We will attempt to use sleep only if the system supports nano-sleep
} else { * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
//instead of interruptible sleeping, perform progressive parks depending on the load * if more than 50% of the checks have failed we will cancel the sleep and just use regular spin
waitTimes = TimedBuffer.wait(waitTimes, spinLimiter); */
private void sleepIfPossible() {
if (useSleep) {
if (checks < MAX_CHECKS_ON_SLEEP) {
timeBefore = System.nanoTime();
}
LockSupport.parkNanos(timeout);
if (checks < MAX_CHECKS_ON_SLEEP) {
long realTimeSleep = System.nanoTime() - timeBefore;
// I'm letting the real time to be up to 50% than the requested sleep.
if (realTimeSleep > timeout * 1.5) {
failedChecks++;
}
if (++checks >= MAX_CHECKS_ON_SLEEP) {
if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
useSleep = false;
}
}
} }
} }
} }
@ -413,35 +444,6 @@ public final class TimedBuffer {
} }
} }
private static int wait(int waitTimes, Semaphore spinLimiter) {
if (waitTimes < 10) {
//doesn't make sense to spin loop here, because of the lock around flush/addBytes operations!
Thread.yield();
waitTimes++;
} else if (waitTimes < 20) {
LockSupport.parkNanos(1L);
waitTimes++;
} else if (waitTimes < 50) {
LockSupport.parkNanos(10L);
waitTimes++;
} else if (waitTimes < 100) {
LockSupport.parkNanos(100L);
waitTimes++;
} else if (waitTimes < 1000) {
LockSupport.parkNanos(1000L);
waitTimes++;
} else {
LockSupport.parkNanos(100_000L);
try {
spinLimiter.acquire();
spinLimiter.release();
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
return waitTimes;
}
/** /**
* Sub classes (tests basically) can use this to override disabling spinning * Sub classes (tests basically) can use this to override disabling spinning
*/ */