ARTEMIS-1474 TimedBuffer improved doc and refactored dead brenches on methods
This commit is contained in:
parent
9a1291279e
commit
f5dfbf7f12
|
@ -34,6 +34,9 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
|||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
|
||||
public final class TimedBuffer {
|
||||
|
||||
private static final double MAX_TIMEOUT_ERROR_FACTOR = 1.5;
|
||||
|
||||
// Constants -----------------------------------------------------
|
||||
|
||||
// The number of tries on sleep before switching to spin
|
||||
|
@ -84,9 +87,6 @@ public final class TimedBuffer {
|
|||
|
||||
private TimerTask logRatesTimerTask;
|
||||
|
||||
//used only in the timerThread do not synchronization
|
||||
private boolean useSleep = true;
|
||||
|
||||
// no need to be volatile as every access is synchronized
|
||||
private boolean spinning = false;
|
||||
|
||||
|
@ -269,20 +269,21 @@ public final class TimedBuffer {
|
|||
}
|
||||
|
||||
public void flush() {
|
||||
flush(false);
|
||||
flushBatch();
|
||||
}
|
||||
|
||||
/**
|
||||
* force means the Journal is moving to a new file. Any pending write need to be done immediately
|
||||
* or data could be lost
|
||||
* Attempts to flush if {@code !delayFlush} and {@code buffer} is filled by any data.
|
||||
*
|
||||
* @return {@code true} when are flushed any bytes, {@code false} otherwise
|
||||
*/
|
||||
public void flush(final boolean force) {
|
||||
public boolean flushBatch() {
|
||||
synchronized (this) {
|
||||
if (!started) {
|
||||
throw new IllegalStateException("TimedBuffer is not started");
|
||||
}
|
||||
|
||||
if ((force || !delayFlush) && buffer.writerIndex() > 0) {
|
||||
if (!delayFlush && buffer.writerIndex() > 0) {
|
||||
int pos = buffer.writerIndex();
|
||||
|
||||
if (logRates) {
|
||||
|
@ -310,6 +311,10 @@ public final class TimedBuffer {
|
|||
bufferLimit = 0;
|
||||
|
||||
flushesDone.incrementAndGet();
|
||||
|
||||
return pos > 0;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -373,6 +378,7 @@ public final class TimedBuffer {
|
|||
@Override
|
||||
public void run() {
|
||||
long lastFlushTime = System.nanoTime();
|
||||
boolean useSleep = true;
|
||||
|
||||
while (!closed) {
|
||||
// We flush on the timer if there are pending syncs there and we've waited at least one
|
||||
|
@ -384,20 +390,24 @@ public final class TimedBuffer {
|
|||
if (useSleep) {
|
||||
// if using sleep, we will always flush
|
||||
lastFlushTime = System.nanoTime();
|
||||
flush();
|
||||
if (flushBatch()) {
|
||||
//it could wait until the timeout is expired
|
||||
final long timeFromTheLastFlush = System.nanoTime() - lastFlushTime;
|
||||
|
||||
// example: Say the device took 20% of the time to write..
|
||||
// We only need to wait 80% more..
|
||||
// timeFromTheLastFlush would be the difference
|
||||
// And if the device took more than that time, there's no need to wait at all.
|
||||
final long timeToSleep = timeout - timeFromTheLastFlush;
|
||||
if (timeToSleep > 0) {
|
||||
useSleep = sleepIfPossible(timeToSleep);
|
||||
}
|
||||
}
|
||||
} else if (bufferObserver != null && System.nanoTime() - lastFlushTime > timeout) {
|
||||
lastFlushTime = System.nanoTime();
|
||||
// if not using flush we will spin and do the time checks manually
|
||||
flush();
|
||||
}
|
||||
|
||||
}
|
||||
//it could wait until the timeout is expired
|
||||
final long timeFromTheLastFlush = System.nanoTime() - lastFlushTime;
|
||||
final long timeToSleep = timeout - timeFromTheLastFlush;
|
||||
if (timeToSleep > 0) {
|
||||
sleepIfPossible(timeToSleep);
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -413,34 +423,35 @@ public final class TimedBuffer {
|
|||
}
|
||||
|
||||
/**
|
||||
* We will attempt to use sleep only if the system supports nano-sleep
|
||||
* We will attempt to use sleep only if the system supports nano-sleep.
|
||||
* we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
|
||||
* if more than 50% of the checks have failed we will cancel the sleep and just use regular spin
|
||||
*/
|
||||
private void sleepIfPossible(long nanosToSleep) {
|
||||
if (useSleep) {
|
||||
try {
|
||||
final long startSleep = System.nanoTime();
|
||||
sleep(nanosToSleep);
|
||||
private boolean sleepIfPossible(long nanosToSleep) {
|
||||
boolean useSleep = true;
|
||||
try {
|
||||
final long startSleep = System.nanoTime();
|
||||
sleep(nanosToSleep);
|
||||
if (checks < MAX_CHECKS_ON_SLEEP) {
|
||||
final long elapsedSleep = System.nanoTime() - startSleep;
|
||||
if (checks < MAX_CHECKS_ON_SLEEP) {
|
||||
// I'm letting the real time to be up to 50% than the requested sleep.
|
||||
if (elapsedSleep > nanosToSleep * 1.5) {
|
||||
failedChecks++;
|
||||
}
|
||||
// I'm letting the real time to be up to 50% than the requested sleep.
|
||||
if (elapsedSleep > (nanosToSleep * MAX_TIMEOUT_ERROR_FACTOR)) {
|
||||
failedChecks++;
|
||||
}
|
||||
|
||||
if (++checks >= MAX_CHECKS_ON_SLEEP) {
|
||||
if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
|
||||
ActiveMQJournalLogger.LOGGER.debug("LockSupport.parkNanos with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
|
||||
useSleep = false;
|
||||
}
|
||||
|
||||
if (++checks >= MAX_CHECKS_ON_SLEEP) {
|
||||
if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
|
||||
ActiveMQJournalLogger.LOGGER.debug("LockSupport.parkNanos with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
|
||||
useSleep = false;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
useSleep = false;
|
||||
ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
useSleep = false;
|
||||
ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
|
||||
}
|
||||
return useSleep;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
|
@ -486,4 +497,4 @@ public final class TimedBuffer {
|
|||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue