ARTEMIS-1294 Using older sleep on TimedBuffer

And also adding test
This commit is contained in:
Clebert Suconic 2017-07-18 10:03:47 -04:00
parent 41a03de02f
commit ad372ec98e
2 changed files with 117 additions and 3 deletions

View File

@ -23,7 +23,6 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -48,7 +47,7 @@ public final class TimedBuffer {
// prevent that
private final Semaphore spinLimiter = new Semaphore(1);
private CheckTimer timerRunnable = null;
private CheckTimer timerRunnable;
private final int bufferSize;
@ -371,6 +370,9 @@ public final class TimedBuffer {
int failedChecks = 0;
long timeBefore = 0;
final int sleepMillis = timeout / 1000000; // truncates
final int sleepNanos = timeout % 1000000;
@Override
public void run() {
long lastFlushTime = 0;
@ -419,7 +421,14 @@ public final class TimedBuffer {
timeBefore = System.nanoTime();
}
LockSupport.parkNanos(timeout);
try {
sleep(sleepMillis, sleepNanos);
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
} catch (Exception e) {
useSleep = false;
ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
}
if (checks < MAX_CHECKS_ON_SLEEP) {
long realTimeSleep = System.nanoTime() - timeBefore;
@ -444,6 +453,17 @@ public final class TimedBuffer {
}
}
/**
* Sub classes (tests basically) can use this to override how the sleep is being done
*
* @param sleepMillis
* @param sleepNanos
* @throws InterruptedException
*/
protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
Thread.sleep(sleepMillis, sleepNanos);
}
/**
* Sub classes (tests basically) can use this to override disabling spinning
*/

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -27,6 +28,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.Assert;
import org.junit.Test;
@ -121,6 +123,98 @@ public class TimedBufferTest extends ActiveMQTestBase {
timedBuffer.stop();
}
}
@Test
public void testTimeOnTimedBuffer() throws Exception {
final ReusableLatch latchFlushed = new ReusableLatch(0);
final AtomicInteger flushes = new AtomicInteger(0);
class TestObserver implements TimedBufferObserver {
@Override
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
for (IOCallback callback : callbacks) {
callback.done();
}
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
*/
@Override
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
return ByteBuffer.allocate(maxSize);
}
@Override
public int getRemainingBytes() {
return 1024 * 1024;
}
}
TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 2, false);
timedBuffer.start();
TestObserver observer = new TestObserver();
timedBuffer.setObserver(observer);
int x = 0;
byte[] bytes = new byte[10];
for (int j = 0; j < 10; j++) {
bytes[j] = ActiveMQTestBase.getSamplebyte(x++);
}
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes);
IOCallback callback = new IOCallback() {
@Override
public void done() {
System.out.println("done");
latchFlushed.countDown();
}
@Override
public void onError(int errorCode, String errorMessage) {
}
};
try {
latchFlushed.setCount(2);
// simulating a low load period
timedBuffer.addBytes(buff, true, callback);
Thread.sleep(1000);
timedBuffer.addBytes(buff, true, callback);
Assert.assertTrue(latchFlushed.await(5, TimeUnit.SECONDS));
latchFlushed.setCount(5);
flushes.set(0);
// Sending like crazy... still some wait (1 millisecond) between each send..
long time = System.currentTimeMillis();
for (int i = 0; i < 5; i++) {
timedBuffer.addBytes(buff, true, callback);
Thread.sleep(1);
}
Assert.assertTrue(latchFlushed.await(5, TimeUnit.SECONDS));
// The purpose of the timed buffer is to batch writes up to a millisecond.. or up to the size of the buffer.
Assert.assertTrue("Timed Buffer is not batching accordingly, it was expected to take at least 500 seconds batching multiple writes while it took " + (System.currentTimeMillis() - time) + " milliseconds", System.currentTimeMillis() - time >= 500);
// it should be in fact only writing once..
// i will set for 3 just in case there's a GC or anything else happening on the test
Assert.assertTrue("Too many writes were called", flushes.get() <= 3);
} finally {
timedBuffer.stop();
}
}
@Test