From 567bfe3b9b1192932fffe00de0892a4117e2695c Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Fri, 28 Jul 2017 16:25:53 +0200 Subject: [PATCH] ARTEMIS-1312 TimedBuffer doubled timeout with blocking flush --- .../artemis/core/io/buffer/TimedBuffer.java | 66 +++-- .../core/journal/impl/TimedBufferTest.java | 243 ++++++++++++++++++ 2 files changed, 273 insertions(+), 36 deletions(-) diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java index 087453de5a..b74ada40d9 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java @@ -23,6 +23,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -368,14 +369,10 @@ public final class TimedBuffer { int checks = 0; int failedChecks = 0; - long timeBefore = 0; - - final int sleepMillis = timeout / 1000000; // truncates - final int sleepNanos = timeout % 1000000; @Override public void run() { - long lastFlushTime = 0; + long lastFlushTime = System.nanoTime(); while (!closed) { // We flush on the timer if there are pending syncs there and we've waited at least one @@ -386,17 +383,22 @@ public final class TimedBuffer { if (pendingSync) { if (useSleep) { // if using sleep, we will always flush - flush(); lastFlushTime = System.nanoTime(); + flush(); + } else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) { + lastFlushTime = System.nanoTime(); // if not using flush we will spin and do the time checks manually flush(); - lastFlushTime = System.nanoTime(); } } - - sleepIfPossible(); + //it could wait until the timeout is expired + final long timeFromTheLastFlush = System.nanoTime() - lastFlushTime; + final long timeToSleep = timeFromTheLastFlush - timeout; + if (timeToSleep > 0) { + sleepIfPossible(timeToSleep); + } try { spinLimiter.acquire(); @@ -415,36 +417,29 @@ public final class TimedBuffer { * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well. * if more than 50% of the checks have failed we will cancel the sleep and just use regular spin */ - private void sleepIfPossible() { + private void sleepIfPossible(long nanosToSleep) { if (useSleep) { - if (checks < MAX_CHECKS_ON_SLEEP) { - timeBefore = System.nanoTime(); - } - try { - sleep(sleepMillis, sleepNanos); - } catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); + final long startSleep = System.nanoTime(); + sleep(nanosToSleep); + final long elapsedSleep = System.nanoTime() - startSleep; + if (checks < MAX_CHECKS_ON_SLEEP) { + // I'm letting the real time to be up to 50% than the requested sleep. + if (elapsedSleep > nanosToSleep * 1.5) { + failedChecks++; + } + + if (++checks >= MAX_CHECKS_ON_SLEEP) { + if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) { + ActiveMQJournalLogger.LOGGER.debug("LockSupport.parkNanos with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts"); + useSleep = false; + } + } + } } catch (Exception e) { useSleep = false; ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e); } - - if (checks < MAX_CHECKS_ON_SLEEP) { - long realTimeSleep = System.nanoTime() - timeBefore; - - // I'm letting the real time to be up to 50% than the requested sleep. - if (realTimeSleep > timeout * 1.5) { - failedChecks++; - } - - if (++checks >= MAX_CHECKS_ON_SLEEP) { - if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) { - ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts"); - useSleep = false; - } - } - } } } @@ -456,12 +451,11 @@ public final class TimedBuffer { /** * Sub classes (tests basically) can use this to override how the sleep is being done * - * @param sleepMillis * @param sleepNanos * @throws InterruptedException */ - protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException { - Thread.sleep(sleepMillis, sleepNanos); + protected void sleep(long sleepNanos) { + LockSupport.parkNanos(sleepNanos); } /** diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java index bddb7ea987..0e7f7c2029 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java @@ -21,13 +21,17 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.core.io.DummyCallback; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver; +import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.ReusableLatch; import org.junit.Assert; import org.junit.Test; @@ -217,6 +221,245 @@ public class TimedBufferTest extends ActiveMQTestBase { } + private static void spinSleep(long timeout) { + if (timeout > 0) { + final long deadline = System.nanoTime() + timeout; + while (System.nanoTime() < deadline) { + //spin wait + } + } + } + + private static final class NonBlockingObserver implements TimedBufferObserver, AutoCloseable { + + private long flushes = 0; + private final ByteBuffer dummyBuffer; + private final Thread asyncIOWriter; + private final AtomicLong flushRequest = new AtomicLong(0); + private final AtomicLong flushesDone = new AtomicLong(0); + + private NonBlockingObserver(int bufferSize, long deviceTime) { + this.asyncIOWriter = new Thread(() -> { + long flushes = 0; + while (!Thread.interrupted()) { + if (flushRequest.get() > flushes) { + final long flushesToBePerformed = flushRequest.get() - flushes; + //during the flush time no new flush request can be taken! + for (int i = 0; i < flushesToBePerformed; i++) { + spinSleep(deviceTime); + flushes++; + //make progress to let others being notified + flushesDone.lazySet(flushes); + } + } + } + }); + dummyBuffer = ByteBuffer.allocate(bufferSize); + asyncIOWriter.start(); + } + + @Override + public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List callbacks) { + assert sync; + assert dummyBuffer == buffer; + if (buffer.position() > 0) { + dummyBuffer.clear(); + flushes++; + //ask the device to perform a flush + flushRequest.lazySet(flushes); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int) + */ + @Override + public ByteBuffer newBuffer(final int minSize, final int maxSize) { + assert maxSize <= dummyBuffer.capacity(); + dummyBuffer.limit(minSize); + return dummyBuffer; + } + + @Override + public int getRemainingBytes() { + return Integer.MAX_VALUE; + } + + @Override + public void close() { + asyncIOWriter.interrupt(); + } + + public void waitUntilFlushIsDone(long flush) { + //spin wait to be more reactive + while (flushesDone.get() < flush) { + + } + } + + public long flushesDone() { + return flushesDone.get(); + } + } + + private static final class BlockingObserver implements TimedBufferObserver, AutoCloseable { + + private long flushes = 0; + private final ByteBuffer dummyBuffer; + private final long deviceTime; + private final AtomicLong flushesDone = new AtomicLong(0); + + private BlockingObserver(int bufferSize, long deviceTime) { + this.dummyBuffer = ByteBuffer.allocate(bufferSize); + this.deviceTime = deviceTime; + } + + @Override + public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List callbacks) { + assert sync; + assert dummyBuffer == buffer; + if (dummyBuffer.position() > 0) { + dummyBuffer.clear(); + //emulate the flush time of a blocking device with a precise sleep + spinSleep(deviceTime); + flushes++; + //publish the number of flushes happened + flushesDone.lazySet(flushes); + } + } + + /* (non-Javadoc) + * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int) + */ + @Override + public ByteBuffer newBuffer(final int minSize, final int maxSize) { + assert maxSize <= dummyBuffer.capacity(); + dummyBuffer.limit(minSize); + return dummyBuffer; + } + + @Override + public int getRemainingBytes() { + return Integer.MAX_VALUE; + } + + @Override + public void close() { + //no op + } + + public void waitUntilFlushIsDone(long flush) { + //spin wait to be more reactive + while (flushesDone.get() < flush) { + + } + } + + public long flushesDone() { + return flushesDone.get(); + } + + } + + private static final EncodingSupport LONG_ENCODER = new EncodingSupport() { + @Override + public int getEncodeSize() { + return Long.BYTES; + } + + @Override + public void encode(ActiveMQBuffer buffer) { + buffer.writeLong(1L); + } + + @Override + public void decode(ActiveMQBuffer buffer) { + + } + }; + + /** + * This test is showing the behaviour of the TimedBuffer with a blocking API (NIO/MAPPED) and + * how the timeout value is not == 1/IOPS like the ASYNCIO case + */ + @Test + public void timeoutShouldMatchFlushIOPSWithNotBlockingFlush() { + //use a large timeout in order to be reactive + final long timeout = TimeUnit.SECONDS.toNanos(2); + assert ((int) timeout) > 0; + //it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match + final long deviceTime = timeout; + final int bufferSize = Env.osPageSize(); + final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) timeout, false); + timedBuffer.start(); + try (NonBlockingObserver observer = new NonBlockingObserver(bufferSize, deviceTime)) { + timedBuffer.setObserver(observer); + //do not call checkSize because we already know that will succeed + timedBuffer.addBytes(LONG_ENCODER, true, DummyCallback.getInstance()); + //wait the first flush to happen + observer.waitUntilFlushIsDone(1); + //for a not-blocking flush I'm expecting the TimedBuffer has near to finished sleeping now + assert observer.flushesDone() == 1; + //issue a new write + timedBuffer.addBytes(LONG_ENCODER, true, DummyCallback.getInstance()); + //the countdown on the TimedBuffer is already started even before this addBytes + final long endOfWriteRequest = System.nanoTime(); + //wait until it will succeed + observer.waitUntilFlushIsDone(2); + final long flushDone = System.nanoTime(); + final long elapsedTime = flushDone - endOfWriteRequest; + assert observer.flushesDone() == 2; + //it is much more than what is expected!!if it will fail it means that the timed IOPS = 1/(timeout + blockingDeviceFlushTime)!!!!!! + //while it has to be IOPS = 1/timeout + System.out.println("elapsed time: " + elapsedTime + " with timeout: " + timeout); + final long maxExpected = timeout + deviceTime; + Assert.assertTrue("elapsed = " + elapsedTime + " max expected = " + maxExpected, elapsedTime <= maxExpected); + } finally { + timedBuffer.stop(); + } + } + + /** + * This test is showing the behaviour of the TimedBuffer with a blocking API (NIO/MAPPED) and + * how the timeout value is not == 1/IOPS like the ASYNCIO case + */ + @Test + public void timeoutShouldMatchFlushIOPSWithBlockingFlush() { + //use a large timeout in order to be reactive + final long timeout = TimeUnit.SECONDS.toNanos(2); + assert ((int) timeout) > 0; + //it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match + final long deviceTime = timeout; + final int bufferSize = Env.osPageSize(); + final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) timeout, false); + timedBuffer.start(); + try (BlockingObserver observer = new BlockingObserver(bufferSize, deviceTime)) { + timedBuffer.setObserver(observer); + //do not call checkSize because we already know that will succeed + timedBuffer.addBytes(LONG_ENCODER, true, DummyCallback.getInstance()); + //wait the first flush to happen + observer.waitUntilFlushIsDone(1); + //for a blocking flush I'm expecting the TimedBuffer has started sleeping now + assert observer.flushesDone() == 1; + //issue a new write + timedBuffer.addBytes(LONG_ENCODER, true, DummyCallback.getInstance()); + //the countdown on the TimedBuffer is already started even before this addBytes + final long endOfWriteRequest = System.nanoTime(); + //wait until it will succeed + observer.waitUntilFlushIsDone(2); + final long flushDone = System.nanoTime(); + final long elapsedTime = flushDone - endOfWriteRequest; + assert observer.flushesDone() == 2; + //it is much more than what is expected!!if it will fail it means that the timed IOPS = 1/(timeout + blockingDeviceFlushTime)!!!!!! + //while it has to be IOPS = 1/timeout + System.out.println("elapsed time: " + elapsedTime + " with timeout: " + timeout); + final long maxExpected = timeout + deviceTime; + Assert.assertTrue("elapsed = " + elapsedTime + " max expected = " + maxExpected, elapsedTime <= maxExpected); + } finally { + timedBuffer.stop(); + } + } + @Test public void testTimingAndFlush() throws Exception { final ArrayList buffers = new ArrayList<>();