ARTEMIS-1312 TimedBuffer doubled timeout with blocking flush

This commit is contained in:
Francesco Nigro 2017-07-28 16:25:53 +02:00 committed by Clebert Suconic
parent 7973410355
commit 567bfe3b9b
2 changed files with 273 additions and 36 deletions

View File

@ -23,6 +23,7 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -368,14 +369,10 @@ public final class TimedBuffer {
int checks = 0;
int failedChecks = 0;
long timeBefore = 0;
final int sleepMillis = timeout / 1000000; // truncates
final int sleepNanos = timeout % 1000000;
@Override
public void run() {
long lastFlushTime = 0;
long lastFlushTime = System.nanoTime();
while (!closed) {
// We flush on the timer if there are pending syncs there and we've waited at least one
@ -386,17 +383,22 @@ public final class TimedBuffer {
if (pendingSync) {
if (useSleep) {
// if using sleep, we will always flush
flush();
lastFlushTime = System.nanoTime();
flush();
} else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) {
lastFlushTime = System.nanoTime();
// if not using flush we will spin and do the time checks manually
flush();
lastFlushTime = System.nanoTime();
}
}
sleepIfPossible();
//it could wait until the timeout is expired
final long timeFromTheLastFlush = System.nanoTime() - lastFlushTime;
final long timeToSleep = timeFromTheLastFlush - timeout;
if (timeToSleep > 0) {
sleepIfPossible(timeToSleep);
}
try {
spinLimiter.acquire();
@ -415,36 +417,29 @@ public final class TimedBuffer {
* we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
* if more than 50% of the checks have failed we will cancel the sleep and just use regular spin
*/
private void sleepIfPossible() {
private void sleepIfPossible(long nanosToSleep) {
if (useSleep) {
if (checks < MAX_CHECKS_ON_SLEEP) {
timeBefore = System.nanoTime();
}
try {
sleep(sleepMillis, sleepNanos);
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
final long startSleep = System.nanoTime();
sleep(nanosToSleep);
final long elapsedSleep = System.nanoTime() - startSleep;
if (checks < MAX_CHECKS_ON_SLEEP) {
// I'm letting the real time to be up to 50% than the requested sleep.
if (elapsedSleep > nanosToSleep * 1.5) {
failedChecks++;
}
if (++checks >= MAX_CHECKS_ON_SLEEP) {
if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
ActiveMQJournalLogger.LOGGER.debug("LockSupport.parkNanos with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
useSleep = false;
}
}
}
} catch (Exception e) {
useSleep = false;
ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
}
if (checks < MAX_CHECKS_ON_SLEEP) {
long realTimeSleep = System.nanoTime() - timeBefore;
// I'm letting the real time to be up to 50% than the requested sleep.
if (realTimeSleep > timeout * 1.5) {
failedChecks++;
}
if (++checks >= MAX_CHECKS_ON_SLEEP) {
if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
useSleep = false;
}
}
}
}
}
@ -456,12 +451,11 @@ public final class TimedBuffer {
/**
* Sub classes (tests basically) can use this to override how the sleep is being done
*
* @param sleepMillis
* @param sleepNanos
* @throws InterruptedException
*/
protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
Thread.sleep(sleepMillis, sleepNanos);
protected void sleep(long sleepNanos) {
LockSupport.parkNanos(sleepNanos);
}
/**

View File

@ -21,13 +21,17 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.Assert;
import org.junit.Test;
@ -217,6 +221,245 @@ public class TimedBufferTest extends ActiveMQTestBase {
}
private static void spinSleep(long timeout) {
if (timeout > 0) {
final long deadline = System.nanoTime() + timeout;
while (System.nanoTime() < deadline) {
//spin wait
}
}
}
private static final class NonBlockingObserver implements TimedBufferObserver, AutoCloseable {
private long flushes = 0;
private final ByteBuffer dummyBuffer;
private final Thread asyncIOWriter;
private final AtomicLong flushRequest = new AtomicLong(0);
private final AtomicLong flushesDone = new AtomicLong(0);
private NonBlockingObserver(int bufferSize, long deviceTime) {
this.asyncIOWriter = new Thread(() -> {
long flushes = 0;
while (!Thread.interrupted()) {
if (flushRequest.get() > flushes) {
final long flushesToBePerformed = flushRequest.get() - flushes;
//during the flush time no new flush request can be taken!
for (int i = 0; i < flushesToBePerformed; i++) {
spinSleep(deviceTime);
flushes++;
//make progress to let others being notified
flushesDone.lazySet(flushes);
}
}
}
});
dummyBuffer = ByteBuffer.allocate(bufferSize);
asyncIOWriter.start();
}
@Override
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
assert sync;
assert dummyBuffer == buffer;
if (buffer.position() > 0) {
dummyBuffer.clear();
flushes++;
//ask the device to perform a flush
flushRequest.lazySet(flushes);
}
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
*/
@Override
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
assert maxSize <= dummyBuffer.capacity();
dummyBuffer.limit(minSize);
return dummyBuffer;
}
@Override
public int getRemainingBytes() {
return Integer.MAX_VALUE;
}
@Override
public void close() {
asyncIOWriter.interrupt();
}
public void waitUntilFlushIsDone(long flush) {
//spin wait to be more reactive
while (flushesDone.get() < flush) {
}
}
public long flushesDone() {
return flushesDone.get();
}
}
private static final class BlockingObserver implements TimedBufferObserver, AutoCloseable {
private long flushes = 0;
private final ByteBuffer dummyBuffer;
private final long deviceTime;
private final AtomicLong flushesDone = new AtomicLong(0);
private BlockingObserver(int bufferSize, long deviceTime) {
this.dummyBuffer = ByteBuffer.allocate(bufferSize);
this.deviceTime = deviceTime;
}
@Override
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
assert sync;
assert dummyBuffer == buffer;
if (dummyBuffer.position() > 0) {
dummyBuffer.clear();
//emulate the flush time of a blocking device with a precise sleep
spinSleep(deviceTime);
flushes++;
//publish the number of flushes happened
flushesDone.lazySet(flushes);
}
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
*/
@Override
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
assert maxSize <= dummyBuffer.capacity();
dummyBuffer.limit(minSize);
return dummyBuffer;
}
@Override
public int getRemainingBytes() {
return Integer.MAX_VALUE;
}
@Override
public void close() {
//no op
}
public void waitUntilFlushIsDone(long flush) {
//spin wait to be more reactive
while (flushesDone.get() < flush) {
}
}
public long flushesDone() {
return flushesDone.get();
}
}
private static final EncodingSupport LONG_ENCODER = new EncodingSupport() {
@Override
public int getEncodeSize() {
return Long.BYTES;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(1L);
}
@Override
public void decode(ActiveMQBuffer buffer) {
}
};
/**
* This test is showing the behaviour of the TimedBuffer with a blocking API (NIO/MAPPED) and
* how the timeout value is not == 1/IOPS like the ASYNCIO case
*/
@Test
public void timeoutShouldMatchFlushIOPSWithNotBlockingFlush() {
//use a large timeout in order to be reactive
final long timeout = TimeUnit.SECONDS.toNanos(2);
assert ((int) timeout) > 0;
//it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match
final long deviceTime = timeout;
final int bufferSize = Env.osPageSize();
final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) timeout, false);
timedBuffer.start();
try (NonBlockingObserver observer = new NonBlockingObserver(bufferSize, deviceTime)) {
timedBuffer.setObserver(observer);
//do not call checkSize because we already know that will succeed
timedBuffer.addBytes(LONG_ENCODER, true, DummyCallback.getInstance());
//wait the first flush to happen
observer.waitUntilFlushIsDone(1);
//for a not-blocking flush I'm expecting the TimedBuffer has near to finished sleeping now
assert observer.flushesDone() == 1;
//issue a new write
timedBuffer.addBytes(LONG_ENCODER, true, DummyCallback.getInstance());
//the countdown on the TimedBuffer is already started even before this addBytes
final long endOfWriteRequest = System.nanoTime();
//wait until it will succeed
observer.waitUntilFlushIsDone(2);
final long flushDone = System.nanoTime();
final long elapsedTime = flushDone - endOfWriteRequest;
assert observer.flushesDone() == 2;
//it is much more than what is expected!!if it will fail it means that the timed IOPS = 1/(timeout + blockingDeviceFlushTime)!!!!!!
//while it has to be IOPS = 1/timeout
System.out.println("elapsed time: " + elapsedTime + " with timeout: " + timeout);
final long maxExpected = timeout + deviceTime;
Assert.assertTrue("elapsed = " + elapsedTime + " max expected = " + maxExpected, elapsedTime <= maxExpected);
} finally {
timedBuffer.stop();
}
}
/**
* This test is showing the behaviour of the TimedBuffer with a blocking API (NIO/MAPPED) and
* how the timeout value is not == 1/IOPS like the ASYNCIO case
*/
@Test
public void timeoutShouldMatchFlushIOPSWithBlockingFlush() {
//use a large timeout in order to be reactive
final long timeout = TimeUnit.SECONDS.toNanos(2);
assert ((int) timeout) > 0;
//it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match
final long deviceTime = timeout;
final int bufferSize = Env.osPageSize();
final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) timeout, false);
timedBuffer.start();
try (BlockingObserver observer = new BlockingObserver(bufferSize, deviceTime)) {
timedBuffer.setObserver(observer);
//do not call checkSize because we already know that will succeed
timedBuffer.addBytes(LONG_ENCODER, true, DummyCallback.getInstance());
//wait the first flush to happen
observer.waitUntilFlushIsDone(1);
//for a blocking flush I'm expecting the TimedBuffer has started sleeping now
assert observer.flushesDone() == 1;
//issue a new write
timedBuffer.addBytes(LONG_ENCODER, true, DummyCallback.getInstance());
//the countdown on the TimedBuffer is already started even before this addBytes
final long endOfWriteRequest = System.nanoTime();
//wait until it will succeed
observer.waitUntilFlushIsDone(2);
final long flushDone = System.nanoTime();
final long elapsedTime = flushDone - endOfWriteRequest;
assert observer.flushesDone() == 2;
//it is much more than what is expected!!if it will fail it means that the timed IOPS = 1/(timeout + blockingDeviceFlushTime)!!!!!!
//while it has to be IOPS = 1/timeout
System.out.println("elapsed time: " + elapsedTime + " with timeout: " + timeout);
final long maxExpected = timeout + deviceTime;
Assert.assertTrue("elapsed = " + elapsedTime + " max expected = " + maxExpected, elapsedTime <= maxExpected);
} finally {
timedBuffer.stop();
}
}
@Test
public void testTimingAndFlush() throws Exception {
final ArrayList<ByteBuffer> buffers = new ArrayList<>();