This commit is contained in:
Clebert Suconic 2017-10-23 14:53:23 -04:00
commit 0ef2c15ab7
1 changed files with 51 additions and 36 deletions

View File

@ -32,8 +32,14 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.jboss.logging.Logger;
public final class TimedBuffer {
private static final Logger logger = Logger.getLogger(TimedBuffer.class);
private static final double MAX_TIMEOUT_ERROR_FACTOR = 1.5;
// Constants -----------------------------------------------------
// The number of tries on sleep before switching to spin
@ -84,9 +90,6 @@ public final class TimedBuffer {
private TimerTask logRatesTimerTask;
//used only in the timerThread do not synchronization
private boolean useSleep = true;
// no need to be volatile as every access is synchronized
private boolean spinning = false;
@ -269,20 +272,21 @@ public final class TimedBuffer {
}
public void flush() {
flush(false);
flushBatch();
}
/**
* force means the Journal is moving to a new file. Any pending write need to be done immediately
* or data could be lost
* Attempts to flush if {@code !delayFlush} and {@code buffer} is filled by any data.
*
* @return {@code true} when are flushed any bytes, {@code false} otherwise
*/
public void flush(final boolean force) {
public boolean flushBatch() {
synchronized (this) {
if (!started) {
throw new IllegalStateException("TimedBuffer is not started");
}
if ((force || !delayFlush) && buffer.writerIndex() > 0) {
if (!delayFlush && buffer.writerIndex() > 0) {
int pos = buffer.writerIndex();
if (logRates) {
@ -310,6 +314,10 @@ public final class TimedBuffer {
bufferLimit = 0;
flushesDone.incrementAndGet();
return pos > 0;
} else {
return false;
}
}
}
@ -373,6 +381,7 @@ public final class TimedBuffer {
@Override
public void run() {
long lastFlushTime = System.nanoTime();
boolean useSleep = true;
while (!closed) {
// We flush on the timer if there are pending syncs there and we've waited at least one
@ -384,20 +393,24 @@ public final class TimedBuffer {
if (useSleep) {
// if using sleep, we will always flush
lastFlushTime = System.nanoTime();
flush();
if (flushBatch()) {
//it could wait until the timeout is expired
final long timeFromTheLastFlush = System.nanoTime() - lastFlushTime;
// example: Say the device took 20% of the time to write..
// We only need to wait 80% more..
// timeFromTheLastFlush would be the difference
// And if the device took more than that time, there's no need to wait at all.
final long timeToSleep = timeout - timeFromTheLastFlush;
if (timeToSleep > 0) {
useSleep = sleepIfPossible(timeToSleep);
}
}
} else if (bufferObserver != null && System.nanoTime() - lastFlushTime > timeout) {
lastFlushTime = System.nanoTime();
// if not using flush we will spin and do the time checks manually
flush();
}
}
//it could wait until the timeout is expired
final long timeFromTheLastFlush = System.nanoTime() - lastFlushTime;
final long timeToSleep = timeout - timeFromTheLastFlush;
if (timeToSleep > 0) {
sleepIfPossible(timeToSleep);
}
try {
@ -413,34 +426,36 @@ public final class TimedBuffer {
}
/**
* We will attempt to use sleep only if the system supports nano-sleep
* We will attempt to use sleep only if the system supports nano-sleep.
* we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
* if more than 50% of the checks have failed we will cancel the sleep and just use regular spin
*/
private void sleepIfPossible(long nanosToSleep) {
if (useSleep) {
try {
final long startSleep = System.nanoTime();
sleep(nanosToSleep);
private boolean sleepIfPossible(long nanosToSleep) {
boolean useSleep = true;
try {
final long startSleep = System.nanoTime();
sleep(nanosToSleep);
if (checks < MAX_CHECKS_ON_SLEEP) {
final long elapsedSleep = System.nanoTime() - startSleep;
if (checks < MAX_CHECKS_ON_SLEEP) {
// I'm letting the real time to be up to 50% than the requested sleep.
if (elapsedSleep > nanosToSleep * 1.5) {
failedChecks++;
}
// I'm letting the real time to be up to 50% than the requested sleep.
if (elapsedSleep > (nanosToSleep * MAX_TIMEOUT_ERROR_FACTOR)) {
failedChecks++;
}
if (++checks >= MAX_CHECKS_ON_SLEEP) {
if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
ActiveMQJournalLogger.LOGGER.debug("LockSupport.parkNanos with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
useSleep = false;
}
if (++checks >= MAX_CHECKS_ON_SLEEP) {
if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
logger.debug("LockSupport.parkNanos with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
useSleep = false;
}
}
} catch (Exception e) {
useSleep = false;
ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
}
} catch (Exception e) {
useSleep = false;
// I don't think we need to individualize a logger code here, this is unlikely to happen anyways
logger.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
}
return useSleep;
}
public void close() {
@ -486,4 +501,4 @@ public final class TimedBuffer {
}
}
}
}