ARTEMIS-1151 Adapting TimedBuffer and NIO Buffer Pooling

- NIO/ASYNCIO new TimedBuffer with adapting batch window heuristic
- NIO/ASYNCIO improved TimedBuffer write monitoring with
  lightweight concurrent performance counters
- NIO/ASYNCIO journal/paging operations benefit from less buffer copy
- NIO/ASYNCIO any buffer copy is always performed with raw batch copy
  using SIMD instrinsics (System::arrayCopy) or memcpy under the hood
- NIO improved clear buffers using SIMD instrinsics (Arrays::fill) and/or memset
- NIO journal operation perform by default TLABs allocation pooling (off heap)
  retaining only the last max sized buffer
- NIO improved file copy operations using zero-copy FileChannel::transfertTo
- NIO improved zeroing using pooled single OS page buffer to clean the file
  + pwrite (on Linux)
- NIO deterministic release of unpooled direct buffers to avoid OOM errors
  due to slow GC
- Exposed OS PAGE SIZE value using Env class
This commit is contained in:
Francesco Nigro 2017-05-02 11:47:44 +02:00 committed by Clebert Suconic
parent 36c9659279
commit 21c9ed85cf
7 changed files with 323 additions and 354 deletions

View File

@ -181,6 +181,7 @@ public class SyncCalculation {
case NIO:
factory = new NIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
((NIOSequentialFileFactory) factory).disableBufferReuse();
factory.start();
return factory;
case ASYNCIO:

View File

@ -17,18 +17,47 @@
package org.apache.activemq.artemis.utils;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
/**
* Utility that detects various properties specific to the current runtime
* environment, such as JVM bitness and OS type.
*/
public final class Env {
/** The system will change a few logs and semantics to be suitable to
* run a long testsuite.
* Like a few log entries that are only valid during a production system.
* or a few cases we need to know as warn on the testsuite and as log in production. */
private static boolean testEnv = false;
private static final int OS_PAGE_SIZE;
static {
//most common OS page size value
int osPageSize = 4096;
sun.misc.Unsafe instance;
try {
Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
instance = (sun.misc.Unsafe) field.get((Object) null);
} catch (Throwable t) {
try {
Constructor<sun.misc.Unsafe> c = sun.misc.Unsafe.class.getDeclaredConstructor(new Class[0]);
c.setAccessible(true);
instance = c.newInstance(new Object[0]);
} catch (Throwable t1) {
instance = null;
}
}
if (instance != null) {
osPageSize = instance.pageSize();
}
OS_PAGE_SIZE = osPageSize;
}
/**
* The system will change a few logs and semantics to be suitable to
* run a long testsuite.
* Like a few log entries that are only valid during a production system.
* or a few cases we need to know as warn on the testsuite and as log in production.
*/
private static boolean testEnv = false;
private static final String OS = System.getProperty("os.name").toLowerCase();
private static final boolean IS_LINUX = OS.startsWith("linux");
@ -38,6 +67,14 @@ public final class Env {
}
/**
* Return the size in bytes of a OS memory page.
* This value will always be a power of two.
*/
public static int osPageSize() {
return OS_PAGE_SIZE;
}
public static boolean isTestEnv() {
return testEnv;
}

View File

@ -189,9 +189,12 @@ public abstract class AbstractSequentialFile implements SequentialFile {
bytes.setIndex(0, bytes.capacity());
timedBuffer.addBytes(bytes, sync, callback);
} else {
ByteBuffer buffer = factory.newBuffer(bytes.capacity());
buffer.put(bytes.toByteBuffer().array());
buffer.rewind();
final int readableBytes = bytes.readableBytes();
final ByteBuffer buffer = factory.newBuffer(readableBytes);
//factory::newBuffer doesn't necessary return a buffer with limit == readableBytes!!
buffer.limit(readableBytes);
bytes.getBytes(bytes.readerIndex(), buffer);
buffer.flip();
writeDirect(buffer, sync, callback);
}
}
@ -215,15 +218,12 @@ public abstract class AbstractSequentialFile implements SequentialFile {
if (timedBuffer != null) {
timedBuffer.addBytes(bytes, sync, callback);
} else {
ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize());
// If not using the TimedBuffer, a final copy is necessary
// Because AIO will need a specific Buffer
// And NIO will also need a whole buffer to perform the write
final int encodedSize = bytes.getEncodeSize();
ByteBuffer buffer = factory.newBuffer(encodedSize);
ActiveMQBuffer outBuffer = ActiveMQBuffers.wrappedBuffer(buffer);
bytes.encode(outBuffer);
buffer.rewind();
buffer.clear();
buffer.limit(encodedSize);
writeDirect(buffer, sync, callback);
}
}
@ -255,9 +255,10 @@ public abstract class AbstractSequentialFile implements SequentialFile {
@Override
public void done() {
for (IOCallback callback : delegates) {
final int size = delegates.size();
for (int i = 0; i < size; i++) {
try {
callback.done();
delegates.get(i).done();
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
}
@ -266,9 +267,10 @@ public abstract class AbstractSequentialFile implements SequentialFile {
@Override
public void onError(final int errorCode, final String errorMessage) {
for (IOCallback callback : delegates) {
final int size = delegates.size();
for (int i = 0; i < size; i++) {
try {
callback.onError(errorCode, errorMessage);
delegates.get(i).onError(errorCode, errorMessage);
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
}

View File

@ -18,27 +18,25 @@ package org.apache.activemq.artemis.core.io.buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public class TimedBuffer {
public final class TimedBuffer {
// Constants -----------------------------------------------------
// The number of tries on sleep before switching to spin
public static final int MAX_CHECKS_ON_SLEEP = 20;
// Attributes ----------------------------------------------------
private TimedBufferObserver bufferObserver;
@ -58,10 +56,9 @@ public class TimedBuffer {
private List<IOCallback> callbacks;
private volatile int timeout;
private final int timeout;
// used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
private volatile boolean pendingSync = false;
private final AtomicLong pendingSyncs = new AtomicLong();
private Thread timerThread;
@ -76,7 +73,7 @@ public class TimedBuffer {
private final boolean logRates;
private final AtomicLong bytesFlushed = new AtomicLong(0);
private long bytesFlushed = 0;
private final AtomicLong flushesDone = new AtomicLong(0);
@ -84,8 +81,6 @@ public class TimedBuffer {
private TimerTask logRatesTimerTask;
private boolean useSleep = true;
// no need to be volatile as every access is synchronized
private boolean spinning = false;
@ -104,27 +99,18 @@ public class TimedBuffer {
logRatesTimer = new Timer(true);
}
// Setting the interval for nano-sleeps
buffer = ActiveMQBuffers.fixedBuffer(bufferSize);
//prefer off heap buffer to allow further humongous allocations and reduce GC overhead
buffer = new ChannelBufferWrapper(Unpooled.directBuffer(size, size));
buffer.clear();
bufferLimit = 0;
callbacks = new ArrayList<>();
callbacks = null;
this.timeout = timeout;
}
// for Debug purposes
public synchronized boolean isUseSleep() {
return useSleep;
}
public synchronized void setUseSleep(boolean useSleep) {
this.useSleep = useSleep;
}
public synchronized void start() {
if (started) {
return;
@ -232,7 +218,28 @@ public class TimedBuffer {
}
public synchronized void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) {
addBytes(new ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback);
if (!started) {
throw new IllegalStateException("TimedBuffer is not started");
}
delayFlush = false;
//it doesn't modify the reader index of bytes as in the original version
final int readableBytes = bytes.readableBytes();
final int writerIndex = buffer.writerIndex();
buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes);
buffer.writerIndex(writerIndex + readableBytes);
if (callbacks == null) {
callbacks = new ArrayList<>();
}
callbacks.add(callback);
if (sync) {
final long currentPendingSyncs = pendingSyncs.get();
pendingSyncs.lazySet(currentPendingSyncs + 1);
startSpin();
}
}
public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) {
@ -244,11 +251,14 @@ public class TimedBuffer {
bytes.encode(buffer);
if (callbacks == null) {
callbacks = new ArrayList<>();
}
callbacks.add(callback);
if (sync) {
pendingSync = true;
final long currentPendingSyncs = pendingSyncs.get();
pendingSyncs.lazySet(currentPendingSyncs + 1);
startSpin();
}
@ -262,45 +272,49 @@ public class TimedBuffer {
* force means the Journal is moving to a new file. Any pending write need to be done immediately
* or data could be lost
*/
public void flush(final boolean force) {
private void flush(final boolean force) {
synchronized (this) {
if (!started) {
throw new IllegalStateException("TimedBuffer is not started");
}
if ((force || !delayFlush) && buffer.writerIndex() > 0) {
int pos = buffer.writerIndex();
final int pos = buffer.writerIndex();
if (logRates) {
bytesFlushed.addAndGet(pos);
}
final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
//bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!!
bufferToFlush.limit(pos);
//perform memcpy under the hood due to the off heap buffer
buffer.getBytes(0, bufferToFlush);
ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
// Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
// Using bufferToFlush.put(buffer) would make several append calls for each byte
// We also transfer the content of this buffer to the native file's buffer
bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
final List<IOCallback> ioCallbacks = callbacks == null ? Collections.emptyList() : callbacks;
bufferObserver.flushBuffer(bufferToFlush, pendingSyncs.get() > 0, ioCallbacks);
stopSpin();
pendingSync = false;
pendingSyncs.lazySet(0);
// swap the instance as the previous callback list is being used asynchronously
callbacks = new LinkedList<>();
callbacks = null;
buffer.clear();
bufferLimit = 0;
flushesDone.incrementAndGet();
if (logRates) {
logFlushed(pos);
}
}
}
}
private void logFlushed(int bytes) {
this.bytesFlushed += bytes;
//more lightweight than XADD if single writer
final long currentFlushesDone = flushesDone.get();
//flushesDone::lazySet write-Release bytesFlushed
flushesDone.lazySet(currentFlushesDone + 1L);
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@ -324,21 +338,21 @@ public class TimedBuffer {
if (!closed) {
long now = System.currentTimeMillis();
long bytesF = bytesFlushed.get();
long flushesD = flushesDone.get();
final long flushesDone = TimedBuffer.this.flushesDone.get();
//flushesDone::get read-Acquire bytesFlushed
final long bytesFlushed = TimedBuffer.this.bytesFlushed;
if (lastExecution != 0) {
double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution);
final double rate = 1000 * (double) (bytesFlushed - lastBytesFlushed) / (now - lastExecution);
ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024)));
double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution);
final double flushRate = 1000 * (double) (flushesDone - lastFlushesDone) / (now - lastExecution);
ActiveMQJournalLogger.LOGGER.flushRate(flushRate);
}
lastExecution = now;
lastBytesFlushed = bytesF;
lastBytesFlushed = bytesFlushed;
lastFlushesDone = flushesD;
lastFlushesDone = flushesDone;
}
}
@ -354,85 +368,41 @@ public class TimedBuffer {
private volatile boolean closed = false;
int checks = 0;
int failedChecks = 0;
long timeBefore = 0;
final int sleepMillis = timeout / 1000000; // truncates
final int sleepNanos = timeout % 1000000;
@Override
public void run() {
int waitTimes = 0;
long lastFlushTime = 0;
long estimatedOptimalBatch = Runtime.getRuntime().availableProcessors();
final Semaphore spinLimiter = TimedBuffer.this.spinLimiter;
final long timeout = TimedBuffer.this.timeout;
while (!closed) {
// We flush on the timer if there are pending syncs there and we've waited at least one
// timeout since the time of the last flush.
// Effectively flushing "resets" the timer
// On the timeout verification, notice that we ignore the timeout check if we are using sleep
boolean flushed = false;
final long currentPendingSyncs = pendingSyncs.get();
if (pendingSync) {
if (isUseSleep()) {
// if using sleep, we will always flush
flush();
lastFlushTime = System.nanoTime();
} else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) {
// if not using flush we will spin and do the time checks manually
flush();
lastFlushTime = System.nanoTime();
}
}
sleepIfPossible();
try {
spinLimiter.acquire();
Thread.yield();
spinLimiter.release();
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
}
/**
* We will attempt to use sleep only if the system supports nano-sleep
* we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
* if more than 50% of the checks have failed we will cancel the sleep and just use regular spin
*/
private void sleepIfPossible() {
if (isUseSleep()) {
if (checks < MAX_CHECKS_ON_SLEEP) {
timeBefore = System.nanoTime();
}
try {
sleep(sleepMillis, sleepNanos);
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
} catch (Exception e) {
setUseSleep(false);
ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
}
if (checks < MAX_CHECKS_ON_SLEEP) {
long realTimeSleep = System.nanoTime() - timeBefore;
// I'm letting the real time to be up to 50% than the requested sleep.
if (realTimeSleep > timeout * 1.5) {
failedChecks++;
}
if (++checks >= MAX_CHECKS_ON_SLEEP) {
if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
setUseSleep(false);
if (currentPendingSyncs > 0) {
if (bufferObserver != null) {
final boolean checkpoint = System.nanoTime() > lastFlushTime + timeout;
if (checkpoint || currentPendingSyncs >= estimatedOptimalBatch) {
flush();
if (checkpoint) {
estimatedOptimalBatch = currentPendingSyncs;
} else {
estimatedOptimalBatch = Math.max(estimatedOptimalBatch, currentPendingSyncs);
}
lastFlushTime = System.nanoTime();
//a flush has been requested
flushed = true;
}
}
}
if (flushed) {
waitTimes = 0;
} else {
//instead of interruptible sleeping, perform progressive parks depending on the load
waitTimes = TimedBuffer.wait(waitTimes, spinLimiter);
}
}
}
@ -441,15 +411,33 @@ public class TimedBuffer {
}
}
/**
* Sub classes (tests basically) can use this to override how the sleep is being done
*
* @param sleepMillis
* @param sleepNanos
* @throws InterruptedException
*/
protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
Thread.sleep(sleepMillis, sleepNanos);
private static int wait(int waitTimes, Semaphore spinLimiter) {
if (waitTimes < 10) {
//doesn't make sense to spin loop here, because of the lock around flush/addBytes operations!
Thread.yield();
waitTimes++;
} else if (waitTimes < 20) {
LockSupport.parkNanos(1L);
waitTimes++;
} else if (waitTimes < 50) {
LockSupport.parkNanos(10L);
waitTimes++;
} else if (waitTimes < 100) {
LockSupport.parkNanos(100L);
waitTimes++;
} else if (waitTimes < 1000) {
LockSupport.parkNanos(1000L);
waitTimes++;
} else {
LockSupport.parkNanos(100_000L);
try {
spinLimiter.acquire();
spinLimiter.release();
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
return waitTimes;
}
/**

View File

@ -22,6 +22,7 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -33,6 +34,8 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.Env;
public final class NIOSequentialFile extends AbstractSequentialFile {
@ -40,9 +43,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
private RandomAccessFile rfile;
private final int defaultMaxIO;
private int maxIO;
private final int maxIO;
public NIOSequentialFile(final SequentialFileFactory factory,
final File directory,
@ -50,7 +51,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
final int maxIO,
final Executor writerExecutor) {
super(directory, file, factory, writerExecutor);
defaultMaxIO = maxIO;
this.maxIO = maxIO;
}
@Override
@ -69,7 +70,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
*/
@Override
public synchronized void open() throws IOException {
open(defaultMaxIO, true);
open(maxIO, true);
}
@Override
@ -90,31 +91,38 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
@Override
public void fill(final int size) throws IOException {
ByteBuffer bb = ByteBuffer.allocate(size);
bb.limit(size);
bb.position(0);
try {
channel.position(0);
channel.write(bb);
channel.force(false);
channel.position(0);
//uses the most common OS page size to match the Page Cache entry size and reduce JVM memory footprint
final int zeroPageCapacity = Env.osPageSize();
final ByteBuffer zeroPage = this.factory.newBuffer(zeroPageCapacity);
try {
int bytesToWrite = size;
long writePosition = 0;
while (bytesToWrite > 0) {
zeroPage.clear();
final int zeroPageLimit = Math.min(bytesToWrite, zeroPageCapacity);
zeroPage.limit(zeroPageLimit);
//use the cheaper pwrite instead of fseek + fwrite
final int writtenBytes = channel.write(zeroPage, writePosition);
bytesToWrite -= writtenBytes;
writePosition += writtenBytes;
}
if (factory.isDatasync()) {
channel.force(true);
}
//set the position to 0 to match the fill contract
channel.position(0);
fileSize = size;
} finally {
//return it to the factory
this.factory.releaseBuffer(zeroPage);
}
} catch (ClosedChannelException e) {
throw e;
} catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
}
channel.force(true);
fileSize = channel.size();
}
public synchronized void waitForClose() throws InterruptedException {
while (isOpen()) {
wait();
}
}
@Override
@ -247,10 +255,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
internalWrite(bytes, sync, null);
}
public void writeInternal(final ByteBuffer bytes) throws Exception {
internalWrite(bytes, true, null);
}
@Override
protected ByteBuffer newBuffer(int size, final int limit) {
// For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO
@ -293,14 +297,51 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
private void doInternalWrite(final ByteBuffer bytes,
final boolean sync,
final IOCallback callback) throws IOException {
channel.write(bytes);
try {
channel.write(bytes);
if (sync) {
sync();
if (sync) {
sync();
}
if (callback != null) {
callback.done();
}
} finally {
//release it to recycle the write buffer if big enough
this.factory.releaseBuffer(bytes);
}
}
if (callback != null) {
callback.done();
@Override
public void copyTo(SequentialFile dstFile) throws IOException {
if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) {
ActiveMQJournalLogger.LOGGER.debug("Copying " + this + " as " + dstFile);
}
if (isOpen()) {
throw new IllegalStateException("File opened!");
}
if (dstFile.isOpen()) {
throw new IllegalArgumentException("dstFile must be closed too");
}
try (RandomAccessFile src = new RandomAccessFile(getFile(), "rw");
FileChannel srcChannel = src.getChannel();
FileLock srcLock = srcChannel.lock()) {
final long readableBytes = srcChannel.size();
if (readableBytes > 0) {
try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw");
FileChannel dstChannel = dst.getChannel();
FileLock dstLock = dstChannel.lock()) {
final long oldLength = dst.length();
final long newLength = oldLength + readableBytes;
dst.setLength(newLength);
final long transferred = dstChannel.transferFrom(srcChannel, oldLength, readableBytes);
if (transferred != readableBytes) {
dstChannel.truncate(oldLength);
throw new IOException("copied less then expected");
}
}
}
}
}
}

View File

@ -19,13 +19,23 @@ package org.apache.activemq.artemis.core.io.nio;
import java.io.File;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.Arrays;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.utils.Env;
public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
public final class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
private static final int DEFAULT_CAPACITY_ALIGNMENT = Env.osPageSize();
private boolean bufferPooling;
//pools only the biggest one -> optimized for the common case
private final ThreadLocal<ByteBuffer> bytesPool;
public NIOSequentialFileFactory(final File journalDir, final int maxIO) {
this(journalDir, null, maxIO);
@ -63,6 +73,8 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
final boolean logRates,
final IOCriticalErrorListener listener) {
super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener);
this.bufferPooling = true;
this.bytesPool = new ThreadLocal<>();
}
public static ByteBuffer allocateDirectByteBuffer(final int size) {
@ -91,6 +103,14 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
return buffer2;
}
public void enableBufferReuse() {
this.bufferPooling = true;
}
public void disableBufferReuse() {
this.bufferPooling = false;
}
@Override
public SequentialFile createSequentialFile(final String fileName) {
return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
@ -101,31 +121,71 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
return timedBuffer != null;
}
private static int align(final int value, final int pow2alignment) {
return (value + (pow2alignment - 1)) & ~(pow2alignment - 1);
}
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
return NIOSequentialFileFactory.allocateDirectByteBuffer(size);
final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
byteBuffer.limit(size);
return byteBuffer;
}
@Override
public void releaseDirectBuffer(ByteBuffer buffer) {
// nothing we can do on this case. we can just have good faith on GC
PlatformDependent.freeDirectBuffer(buffer);
}
@Override
public ByteBuffer newBuffer(final int size) {
return ByteBuffer.allocate(size);
if (!this.bufferPooling) {
return allocateDirectBuffer(size);
} else {
final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
//do not free the old one (if any) until the new one will be released into the pool!
byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
} else {
bytesPool.set(null);
PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0);
byteBuffer.clear();
}
byteBuffer.limit(size);
return byteBuffer;
}
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
if (this.bufferPooling) {
if (buffer.isDirect()) {
final ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer != buffer) {
//replace with the current pooled only if greater or null
if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) {
if (byteBuffer != null) {
//free the smaller one
PlatformDependent.freeDirectBuffer(byteBuffer);
}
bytesPool.set(buffer);
} else {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
}
}
@Override
public void clearBuffer(final ByteBuffer buffer) {
final int limit = buffer.limit();
buffer.rewind();
for (int i = 0; i < limit; i++) {
buffer.put((byte) 0);
if (buffer.isDirect()) {
PlatformDependent.setMemory(PlatformDependent.directBufferAddress(buffer), buffer.limit(), (byte) 0);
} else {
Arrays.fill(buffer.array(), buffer.arrayOffset(), buffer.limit(), (byte) 0);
}
buffer.rewind();
}
@Override

View File

@ -19,8 +19,6 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -205,162 +203,4 @@ public class TimedBufferTest extends ActiveMQTestBase {
}
}
/**
* This test will verify if the system will switch to spin case the system can't perform sleeps timely
* due to proper kernel installations
*
* @throws Exception
*/
@Test
public void testVerifySwitchToSpin() throws Exception {
class TestObserver implements TimedBufferObserver {
@Override
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
*/
@Override
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
return ByteBuffer.allocate(maxSize);
}
@Override
public int getRemainingBytes() {
return 1024 * 1024;
}
}
final CountDownLatch sleptLatch = new CountDownLatch(1);
TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 1000, false) {
@Override
protected void stopSpin() {
// keeps spinning forever
}
@Override
protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
Thread.sleep(10);
}
@Override
public synchronized void setUseSleep(boolean param) {
super.setUseSleep(param);
sleptLatch.countDown();
}
};
timedBuffer.start();
try {
timedBuffer.setObserver(new TestObserver());
int x = 0;
byte[] bytes = new byte[10];
for (int j = 0; j < 10; j++) {
bytes[j] = ActiveMQTestBase.getSamplebyte(x++);
}
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes);
timedBuffer.checkSize(10);
timedBuffer.addBytes(buff, true, dummyCallback);
sleptLatch.await(10, TimeUnit.SECONDS);
assertFalse(timedBuffer.isUseSleep());
} finally {
timedBuffer.stop();
}
}
/**
* This test will verify if the system will switch to spin case the system can't perform sleeps timely
* due to proper kernel installations
*
* @throws Exception
*/
@Test
public void testStillSleeps() throws Exception {
class TestObserver implements TimedBufferObserver {
@Override
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
*/
@Override
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
return ByteBuffer.allocate(maxSize);
}
@Override
public int getRemainingBytes() {
return 1024 * 1024;
}
}
final CountDownLatch sleptLatch = new CountDownLatch(TimedBuffer.MAX_CHECKS_ON_SLEEP);
TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 1000, false) {
@Override
protected void stopSpin() {
// keeps spinning forever
}
@Override
protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
sleptLatch.countDown();
// no sleep
}
@Override
public synchronized void setUseSleep(boolean param) {
super.setUseSleep(param);
sleptLatch.countDown();
}
};
timedBuffer.start();
try {
timedBuffer.setObserver(new TestObserver());
int x = 0;
byte[] bytes = new byte[10];
for (int j = 0; j < 10; j++) {
bytes[j] = ActiveMQTestBase.getSamplebyte(x++);
}
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes);
timedBuffer.checkSize(10);
timedBuffer.addBytes(buff, true, dummyCallback);
// waits all the sleeps to be done
sleptLatch.await(10, TimeUnit.SECONDS);
// keeps waiting a bit longer
Thread.sleep(100);
assertTrue(timedBuffer.isUseSleep());
} finally {
timedBuffer.stop();
}
}
}