This commit is contained in:
Clebert Suconic 2017-05-08 11:57:41 -04:00
commit 6984d39a99
7 changed files with 323 additions and 354 deletions

View File

@ -181,6 +181,7 @@ public class SyncCalculation {
case NIO: case NIO:
factory = new NIOSequentialFileFactory(datafolder, 1).setDatasync(datasync); factory = new NIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
((NIOSequentialFileFactory) factory).disableBufferReuse();
factory.start(); factory.start();
return factory; return factory;
case ASYNCIO: case ASYNCIO:

View File

@ -17,19 +17,48 @@
package org.apache.activemq.artemis.utils; package org.apache.activemq.artemis.utils;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
/** /**
* Utility that detects various properties specific to the current runtime * Utility that detects various properties specific to the current runtime
* environment, such as JVM bitness and OS type. * environment, such as JVM bitness and OS type.
*/ */
public final class Env { public final class Env {
/** The system will change a few logs and semantics to be suitable to private static final int OS_PAGE_SIZE;
static {
//most common OS page size value
int osPageSize = 4096;
sun.misc.Unsafe instance;
try {
Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
instance = (sun.misc.Unsafe) field.get((Object) null);
} catch (Throwable t) {
try {
Constructor<sun.misc.Unsafe> c = sun.misc.Unsafe.class.getDeclaredConstructor(new Class[0]);
c.setAccessible(true);
instance = c.newInstance(new Object[0]);
} catch (Throwable t1) {
instance = null;
}
}
if (instance != null) {
osPageSize = instance.pageSize();
}
OS_PAGE_SIZE = osPageSize;
}
/**
* The system will change a few logs and semantics to be suitable to
* run a long testsuite. * run a long testsuite.
* Like a few log entries that are only valid during a production system. * Like a few log entries that are only valid during a production system.
* or a few cases we need to know as warn on the testsuite and as log in production. */ * or a few cases we need to know as warn on the testsuite and as log in production.
*/
private static boolean testEnv = false; private static boolean testEnv = false;
private static final String OS = System.getProperty("os.name").toLowerCase(); private static final String OS = System.getProperty("os.name").toLowerCase();
private static final boolean IS_LINUX = OS.startsWith("linux"); private static final boolean IS_LINUX = OS.startsWith("linux");
private static final boolean IS_64BIT = checkIs64bit(); private static final boolean IS_64BIT = checkIs64bit();
@ -38,6 +67,14 @@ public final class Env {
} }
/**
* Return the size in bytes of a OS memory page.
* This value will always be a power of two.
*/
public static int osPageSize() {
return OS_PAGE_SIZE;
}
public static boolean isTestEnv() { public static boolean isTestEnv() {
return testEnv; return testEnv;
} }

View File

@ -189,9 +189,12 @@ public abstract class AbstractSequentialFile implements SequentialFile {
bytes.setIndex(0, bytes.capacity()); bytes.setIndex(0, bytes.capacity());
timedBuffer.addBytes(bytes, sync, callback); timedBuffer.addBytes(bytes, sync, callback);
} else { } else {
ByteBuffer buffer = factory.newBuffer(bytes.capacity()); final int readableBytes = bytes.readableBytes();
buffer.put(bytes.toByteBuffer().array()); final ByteBuffer buffer = factory.newBuffer(readableBytes);
buffer.rewind(); //factory::newBuffer doesn't necessary return a buffer with limit == readableBytes!!
buffer.limit(readableBytes);
bytes.getBytes(bytes.readerIndex(), buffer);
buffer.flip();
writeDirect(buffer, sync, callback); writeDirect(buffer, sync, callback);
} }
} }
@ -215,15 +218,12 @@ public abstract class AbstractSequentialFile implements SequentialFile {
if (timedBuffer != null) { if (timedBuffer != null) {
timedBuffer.addBytes(bytes, sync, callback); timedBuffer.addBytes(bytes, sync, callback);
} else { } else {
ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize()); final int encodedSize = bytes.getEncodeSize();
ByteBuffer buffer = factory.newBuffer(encodedSize);
// If not using the TimedBuffer, a final copy is necessary
// Because AIO will need a specific Buffer
// And NIO will also need a whole buffer to perform the write
ActiveMQBuffer outBuffer = ActiveMQBuffers.wrappedBuffer(buffer); ActiveMQBuffer outBuffer = ActiveMQBuffers.wrappedBuffer(buffer);
bytes.encode(outBuffer); bytes.encode(outBuffer);
buffer.rewind(); buffer.clear();
buffer.limit(encodedSize);
writeDirect(buffer, sync, callback); writeDirect(buffer, sync, callback);
} }
} }
@ -255,9 +255,10 @@ public abstract class AbstractSequentialFile implements SequentialFile {
@Override @Override
public void done() { public void done() {
for (IOCallback callback : delegates) { final int size = delegates.size();
for (int i = 0; i < size; i++) {
try { try {
callback.done(); delegates.get(i).done();
} catch (Throwable e) { } catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e); ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
} }
@ -266,9 +267,10 @@ public abstract class AbstractSequentialFile implements SequentialFile {
@Override @Override
public void onError(final int errorCode, final String errorMessage) { public void onError(final int errorCode, final String errorMessage) {
for (IOCallback callback : delegates) { final int size = delegates.size();
for (int i = 0; i < size; i++) {
try { try {
callback.onError(errorCode, errorMessage); delegates.get(i).onError(errorCode, errorMessage);
} catch (Throwable e) { } catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e); ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
} }

View File

@ -18,27 +18,25 @@ package org.apache.activemq.artemis.core.io.buffer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public class TimedBuffer { public final class TimedBuffer {
// Constants ----------------------------------------------------- // Constants -----------------------------------------------------
// The number of tries on sleep before switching to spin
public static final int MAX_CHECKS_ON_SLEEP = 20;
// Attributes ---------------------------------------------------- // Attributes ----------------------------------------------------
private TimedBufferObserver bufferObserver; private TimedBufferObserver bufferObserver;
@ -58,10 +56,9 @@ public class TimedBuffer {
private List<IOCallback> callbacks; private List<IOCallback> callbacks;
private volatile int timeout; private final int timeout;
// used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen private final AtomicLong pendingSyncs = new AtomicLong();
private volatile boolean pendingSync = false;
private Thread timerThread; private Thread timerThread;
@ -76,7 +73,7 @@ public class TimedBuffer {
private final boolean logRates; private final boolean logRates;
private final AtomicLong bytesFlushed = new AtomicLong(0); private long bytesFlushed = 0;
private final AtomicLong flushesDone = new AtomicLong(0); private final AtomicLong flushesDone = new AtomicLong(0);
@ -84,8 +81,6 @@ public class TimedBuffer {
private TimerTask logRatesTimerTask; private TimerTask logRatesTimerTask;
private boolean useSleep = true;
// no need to be volatile as every access is synchronized // no need to be volatile as every access is synchronized
private boolean spinning = false; private boolean spinning = false;
@ -104,27 +99,18 @@ public class TimedBuffer {
logRatesTimer = new Timer(true); logRatesTimer = new Timer(true);
} }
// Setting the interval for nano-sleeps // Setting the interval for nano-sleeps
//prefer off heap buffer to allow further humongous allocations and reduce GC overhead
buffer = ActiveMQBuffers.fixedBuffer(bufferSize); buffer = new ChannelBufferWrapper(Unpooled.directBuffer(size, size));
buffer.clear(); buffer.clear();
bufferLimit = 0; bufferLimit = 0;
callbacks = new ArrayList<>(); callbacks = null;
this.timeout = timeout; this.timeout = timeout;
} }
// for Debug purposes
public synchronized boolean isUseSleep() {
return useSleep;
}
public synchronized void setUseSleep(boolean useSleep) {
this.useSleep = useSleep;
}
public synchronized void start() { public synchronized void start() {
if (started) { if (started) {
return; return;
@ -232,7 +218,28 @@ public class TimedBuffer {
} }
public synchronized void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) { public synchronized void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) {
addBytes(new ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback); if (!started) {
throw new IllegalStateException("TimedBuffer is not started");
}
delayFlush = false;
//it doesn't modify the reader index of bytes as in the original version
final int readableBytes = bytes.readableBytes();
final int writerIndex = buffer.writerIndex();
buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes);
buffer.writerIndex(writerIndex + readableBytes);
if (callbacks == null) {
callbacks = new ArrayList<>();
}
callbacks.add(callback);
if (sync) {
final long currentPendingSyncs = pendingSyncs.get();
pendingSyncs.lazySet(currentPendingSyncs + 1);
startSpin();
}
} }
public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) { public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) {
@ -244,11 +251,14 @@ public class TimedBuffer {
bytes.encode(buffer); bytes.encode(buffer);
if (callbacks == null) {
callbacks = new ArrayList<>();
}
callbacks.add(callback); callbacks.add(callback);
if (sync) { if (sync) {
pendingSync = true; final long currentPendingSyncs = pendingSyncs.get();
pendingSyncs.lazySet(currentPendingSyncs + 1);
startSpin(); startSpin();
} }
@ -262,44 +272,48 @@ public class TimedBuffer {
* force means the Journal is moving to a new file. Any pending write need to be done immediately * force means the Journal is moving to a new file. Any pending write need to be done immediately
* or data could be lost * or data could be lost
*/ */
public void flush(final boolean force) { private void flush(final boolean force) {
synchronized (this) { synchronized (this) {
if (!started) { if (!started) {
throw new IllegalStateException("TimedBuffer is not started"); throw new IllegalStateException("TimedBuffer is not started");
} }
if ((force || !delayFlush) && buffer.writerIndex() > 0) { if ((force || !delayFlush) && buffer.writerIndex() > 0) {
int pos = buffer.writerIndex(); final int pos = buffer.writerIndex();
if (logRates) { final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
bytesFlushed.addAndGet(pos); //bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!!
} bufferToFlush.limit(pos);
//perform memcpy under the hood due to the off heap buffer
buffer.getBytes(0, bufferToFlush);
ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos); final List<IOCallback> ioCallbacks = callbacks == null ? Collections.emptyList() : callbacks;
bufferObserver.flushBuffer(bufferToFlush, pendingSyncs.get() > 0, ioCallbacks);
// Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
// Using bufferToFlush.put(buffer) would make several append calls for each byte
// We also transfer the content of this buffer to the native file's buffer
bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
stopSpin(); stopSpin();
pendingSync = false; pendingSyncs.lazySet(0);
// swap the instance as the previous callback list is being used asynchronously callbacks = null;
callbacks = new LinkedList<>();
buffer.clear(); buffer.clear();
bufferLimit = 0; bufferLimit = 0;
flushesDone.incrementAndGet(); if (logRates) {
logFlushed(pos);
} }
} }
} }
}
private void logFlushed(int bytes) {
this.bytesFlushed += bytes;
//more lightweight than XADD if single writer
final long currentFlushesDone = flushesDone.get();
//flushesDone::lazySet write-Release bytesFlushed
flushesDone.lazySet(currentFlushesDone + 1L);
}
// Package protected --------------------------------------------- // Package protected ---------------------------------------------
@ -324,21 +338,21 @@ public class TimedBuffer {
if (!closed) { if (!closed) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long bytesF = bytesFlushed.get(); final long flushesDone = TimedBuffer.this.flushesDone.get();
long flushesD = flushesDone.get(); //flushesDone::get read-Acquire bytesFlushed
final long bytesFlushed = TimedBuffer.this.bytesFlushed;
if (lastExecution != 0) { if (lastExecution != 0) {
double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution); final double rate = 1000 * (double) (bytesFlushed - lastBytesFlushed) / (now - lastExecution);
ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024))); ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024)));
double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution); final double flushRate = 1000 * (double) (flushesDone - lastFlushesDone) / (now - lastExecution);
ActiveMQJournalLogger.LOGGER.flushRate(flushRate); ActiveMQJournalLogger.LOGGER.flushRate(flushRate);
} }
lastExecution = now; lastExecution = now;
lastBytesFlushed = bytesF; lastBytesFlushed = bytesFlushed;
lastFlushesDone = flushesD; lastFlushesDone = flushesDone;
} }
} }
@ -354,84 +368,40 @@ public class TimedBuffer {
private volatile boolean closed = false; private volatile boolean closed = false;
int checks = 0;
int failedChecks = 0;
long timeBefore = 0;
final int sleepMillis = timeout / 1000000; // truncates
final int sleepNanos = timeout % 1000000;
@Override @Override
public void run() { public void run() {
int waitTimes = 0;
long lastFlushTime = 0; long lastFlushTime = 0;
long estimatedOptimalBatch = Runtime.getRuntime().availableProcessors();
final Semaphore spinLimiter = TimedBuffer.this.spinLimiter;
final long timeout = TimedBuffer.this.timeout;
while (!closed) { while (!closed) {
// We flush on the timer if there are pending syncs there and we've waited at least one boolean flushed = false;
// timeout since the time of the last flush. final long currentPendingSyncs = pendingSyncs.get();
// Effectively flushing "resets" the timer
// On the timeout verification, notice that we ignore the timeout check if we are using sleep
if (pendingSync) { if (currentPendingSyncs > 0) {
if (isUseSleep()) { if (bufferObserver != null) {
// if using sleep, we will always flush final boolean checkpoint = System.nanoTime() > lastFlushTime + timeout;
if (checkpoint || currentPendingSyncs >= estimatedOptimalBatch) {
flush(); flush();
if (checkpoint) {
estimatedOptimalBatch = currentPendingSyncs;
} else {
estimatedOptimalBatch = Math.max(estimatedOptimalBatch, currentPendingSyncs);
}
lastFlushTime = System.nanoTime(); lastFlushTime = System.nanoTime();
} else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) { //a flush has been requested
// if not using flush we will spin and do the time checks manually flushed = true;
flush();
lastFlushTime = System.nanoTime();
}
}
sleepIfPossible();
try {
spinLimiter.acquire();
Thread.yield();
spinLimiter.release();
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
} }
} }
} }
/** if (flushed) {
* We will attempt to use sleep only if the system supports nano-sleep waitTimes = 0;
* we will on that case verify up to MAX_CHECKS if nano sleep is behaving well. } else {
* if more than 50% of the checks have failed we will cancel the sleep and just use regular spin //instead of interruptible sleeping, perform progressive parks depending on the load
*/ waitTimes = TimedBuffer.wait(waitTimes, spinLimiter);
private void sleepIfPossible() {
if (isUseSleep()) {
if (checks < MAX_CHECKS_ON_SLEEP) {
timeBefore = System.nanoTime();
}
try {
sleep(sleepMillis, sleepNanos);
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
} catch (Exception e) {
setUseSleep(false);
ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
}
if (checks < MAX_CHECKS_ON_SLEEP) {
long realTimeSleep = System.nanoTime() - timeBefore;
// I'm letting the real time to be up to 50% than the requested sleep.
if (realTimeSleep > timeout * 1.5) {
failedChecks++;
}
if (++checks >= MAX_CHECKS_ON_SLEEP) {
if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
setUseSleep(false);
}
}
} }
} }
} }
@ -441,15 +411,33 @@ public class TimedBuffer {
} }
} }
/** private static int wait(int waitTimes, Semaphore spinLimiter) {
* Sub classes (tests basically) can use this to override how the sleep is being done if (waitTimes < 10) {
* //doesn't make sense to spin loop here, because of the lock around flush/addBytes operations!
* @param sleepMillis Thread.yield();
* @param sleepNanos waitTimes++;
* @throws InterruptedException } else if (waitTimes < 20) {
*/ LockSupport.parkNanos(1L);
protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException { waitTimes++;
Thread.sleep(sleepMillis, sleepNanos); } else if (waitTimes < 50) {
LockSupport.parkNanos(10L);
waitTimes++;
} else if (waitTimes < 100) {
LockSupport.parkNanos(100L);
waitTimes++;
} else if (waitTimes < 1000) {
LockSupport.parkNanos(1000L);
waitTimes++;
} else {
LockSupport.parkNanos(100_000L);
try {
spinLimiter.acquire();
spinLimiter.release();
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
return waitTimes;
} }
/** /**

View File

@ -22,6 +22,7 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -33,6 +34,8 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.Env;
public final class NIOSequentialFile extends AbstractSequentialFile { public final class NIOSequentialFile extends AbstractSequentialFile {
@ -40,9 +43,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
private RandomAccessFile rfile; private RandomAccessFile rfile;
private final int defaultMaxIO; private final int maxIO;
private int maxIO;
public NIOSequentialFile(final SequentialFileFactory factory, public NIOSequentialFile(final SequentialFileFactory factory,
final File directory, final File directory,
@ -50,7 +51,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
final int maxIO, final int maxIO,
final Executor writerExecutor) { final Executor writerExecutor) {
super(directory, file, factory, writerExecutor); super(directory, file, factory, writerExecutor);
defaultMaxIO = maxIO; this.maxIO = maxIO;
} }
@Override @Override
@ -69,7 +70,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
*/ */
@Override @Override
public synchronized void open() throws IOException { public synchronized void open() throws IOException {
open(defaultMaxIO, true); open(maxIO, true);
} }
@Override @Override
@ -90,31 +91,38 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
@Override @Override
public void fill(final int size) throws IOException { public void fill(final int size) throws IOException {
ByteBuffer bb = ByteBuffer.allocate(size);
bb.limit(size);
bb.position(0);
try { try {
//uses the most common OS page size to match the Page Cache entry size and reduce JVM memory footprint
final int zeroPageCapacity = Env.osPageSize();
final ByteBuffer zeroPage = this.factory.newBuffer(zeroPageCapacity);
try {
int bytesToWrite = size;
long writePosition = 0;
while (bytesToWrite > 0) {
zeroPage.clear();
final int zeroPageLimit = Math.min(bytesToWrite, zeroPageCapacity);
zeroPage.limit(zeroPageLimit);
//use the cheaper pwrite instead of fseek + fwrite
final int writtenBytes = channel.write(zeroPage, writePosition);
bytesToWrite -= writtenBytes;
writePosition += writtenBytes;
}
if (factory.isDatasync()) {
channel.force(true);
}
//set the position to 0 to match the fill contract
channel.position(0); channel.position(0);
channel.write(bb); fileSize = size;
channel.force(false); } finally {
channel.position(0); //return it to the factory
this.factory.releaseBuffer(zeroPage);
}
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
throw e; throw e;
} catch (IOException e) { } catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e; throw e;
} }
channel.force(true);
fileSize = channel.size();
}
public synchronized void waitForClose() throws InterruptedException {
while (isOpen()) {
wait();
}
} }
@Override @Override
@ -247,10 +255,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
internalWrite(bytes, sync, null); internalWrite(bytes, sync, null);
} }
public void writeInternal(final ByteBuffer bytes) throws Exception {
internalWrite(bytes, true, null);
}
@Override @Override
protected ByteBuffer newBuffer(int size, final int limit) { protected ByteBuffer newBuffer(int size, final int limit) {
// For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO // For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO
@ -293,6 +297,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
private void doInternalWrite(final ByteBuffer bytes, private void doInternalWrite(final ByteBuffer bytes,
final boolean sync, final boolean sync,
final IOCallback callback) throws IOException { final IOCallback callback) throws IOException {
try {
channel.write(bytes); channel.write(bytes);
if (sync) { if (sync) {
@ -302,5 +307,41 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
if (callback != null) { if (callback != null) {
callback.done(); callback.done();
} }
} finally {
//release it to recycle the write buffer if big enough
this.factory.releaseBuffer(bytes);
}
}
@Override
public void copyTo(SequentialFile dstFile) throws IOException {
if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) {
ActiveMQJournalLogger.LOGGER.debug("Copying " + this + " as " + dstFile);
}
if (isOpen()) {
throw new IllegalStateException("File opened!");
}
if (dstFile.isOpen()) {
throw new IllegalArgumentException("dstFile must be closed too");
}
try (RandomAccessFile src = new RandomAccessFile(getFile(), "rw");
FileChannel srcChannel = src.getChannel();
FileLock srcLock = srcChannel.lock()) {
final long readableBytes = srcChannel.size();
if (readableBytes > 0) {
try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw");
FileChannel dstChannel = dst.getChannel();
FileLock dstLock = dstChannel.lock()) {
final long oldLength = dst.length();
final long newLength = oldLength + readableBytes;
dst.setLength(newLength);
final long transferred = dstChannel.transferFrom(srcChannel, oldLength, readableBytes);
if (transferred != readableBytes) {
dstChannel.truncate(oldLength);
throw new IOException("copied less then expected");
}
}
}
}
} }
} }

View File

@ -19,13 +19,23 @@ package org.apache.activemq.artemis.core.io.nio;
import java.io.File; import java.io.File;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.utils.Env;
public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { public final class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
private static final int DEFAULT_CAPACITY_ALIGNMENT = Env.osPageSize();
private boolean bufferPooling;
//pools only the biggest one -> optimized for the common case
private final ThreadLocal<ByteBuffer> bytesPool;
public NIOSequentialFileFactory(final File journalDir, final int maxIO) { public NIOSequentialFileFactory(final File journalDir, final int maxIO) {
this(journalDir, null, maxIO); this(journalDir, null, maxIO);
@ -63,6 +73,8 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
final boolean logRates, final boolean logRates,
final IOCriticalErrorListener listener) { final IOCriticalErrorListener listener) {
super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener); super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener);
this.bufferPooling = true;
this.bytesPool = new ThreadLocal<>();
} }
public static ByteBuffer allocateDirectByteBuffer(final int size) { public static ByteBuffer allocateDirectByteBuffer(final int size) {
@ -91,6 +103,14 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
return buffer2; return buffer2;
} }
public void enableBufferReuse() {
this.bufferPooling = true;
}
public void disableBufferReuse() {
this.bufferPooling = false;
}
@Override @Override
public SequentialFile createSequentialFile(final String fileName) { public SequentialFile createSequentialFile(final String fileName) {
return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor); return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
@ -101,31 +121,71 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
return timedBuffer != null; return timedBuffer != null;
} }
private static int align(final int value, final int pow2alignment) {
return (value + (pow2alignment - 1)) & ~(pow2alignment - 1);
}
@Override @Override
public ByteBuffer allocateDirectBuffer(final int size) { public ByteBuffer allocateDirectBuffer(final int size) {
return NIOSequentialFileFactory.allocateDirectByteBuffer(size); final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
byteBuffer.limit(size);
return byteBuffer;
} }
@Override @Override
public void releaseDirectBuffer(ByteBuffer buffer) { public void releaseDirectBuffer(ByteBuffer buffer) {
// nothing we can do on this case. we can just have good faith on GC PlatformDependent.freeDirectBuffer(buffer);
} }
@Override @Override
public ByteBuffer newBuffer(final int size) { public ByteBuffer newBuffer(final int size) {
return ByteBuffer.allocate(size); if (!this.bufferPooling) {
return allocateDirectBuffer(size);
} else {
final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
//do not free the old one (if any) until the new one will be released into the pool!
byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
} else {
bytesPool.set(null);
PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0);
byteBuffer.clear();
}
byteBuffer.limit(size);
return byteBuffer;
}
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
if (this.bufferPooling) {
if (buffer.isDirect()) {
final ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer != buffer) {
//replace with the current pooled only if greater or null
if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) {
if (byteBuffer != null) {
//free the smaller one
PlatformDependent.freeDirectBuffer(byteBuffer);
}
bytesPool.set(buffer);
} else {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
}
} }
@Override @Override
public void clearBuffer(final ByteBuffer buffer) { public void clearBuffer(final ByteBuffer buffer) {
final int limit = buffer.limit(); if (buffer.isDirect()) {
buffer.rewind(); PlatformDependent.setMemory(PlatformDependent.directBufferAddress(buffer), buffer.limit(), (byte) 0);
} else {
for (int i = 0; i < limit; i++) { Arrays.fill(buffer.array(), buffer.arrayOffset(), buffer.limit(), (byte) 0);
buffer.put((byte) 0);
} }
buffer.rewind();
} }
@Override @Override

View File

@ -19,8 +19,6 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -205,162 +203,4 @@ public class TimedBufferTest extends ActiveMQTestBase {
} }
} }
/**
* This test will verify if the system will switch to spin case the system can't perform sleeps timely
* due to proper kernel installations
*
* @throws Exception
*/
@Test
public void testVerifySwitchToSpin() throws Exception {
class TestObserver implements TimedBufferObserver {
@Override
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
*/
@Override
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
return ByteBuffer.allocate(maxSize);
}
@Override
public int getRemainingBytes() {
return 1024 * 1024;
}
}
final CountDownLatch sleptLatch = new CountDownLatch(1);
TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 1000, false) {
@Override
protected void stopSpin() {
// keeps spinning forever
}
@Override
protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
Thread.sleep(10);
}
@Override
public synchronized void setUseSleep(boolean param) {
super.setUseSleep(param);
sleptLatch.countDown();
}
};
timedBuffer.start();
try {
timedBuffer.setObserver(new TestObserver());
int x = 0;
byte[] bytes = new byte[10];
for (int j = 0; j < 10; j++) {
bytes[j] = ActiveMQTestBase.getSamplebyte(x++);
}
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes);
timedBuffer.checkSize(10);
timedBuffer.addBytes(buff, true, dummyCallback);
sleptLatch.await(10, TimeUnit.SECONDS);
assertFalse(timedBuffer.isUseSleep());
} finally {
timedBuffer.stop();
}
}
/**
* This test will verify if the system will switch to spin case the system can't perform sleeps timely
* due to proper kernel installations
*
* @throws Exception
*/
@Test
public void testStillSleeps() throws Exception {
class TestObserver implements TimedBufferObserver {
@Override
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
}
/* (non-Javadoc)
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
*/
@Override
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
return ByteBuffer.allocate(maxSize);
}
@Override
public int getRemainingBytes() {
return 1024 * 1024;
}
}
final CountDownLatch sleptLatch = new CountDownLatch(TimedBuffer.MAX_CHECKS_ON_SLEEP);
TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 1000, false) {
@Override
protected void stopSpin() {
// keeps spinning forever
}
@Override
protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
sleptLatch.countDown();
// no sleep
}
@Override
public synchronized void setUseSleep(boolean param) {
super.setUseSleep(param);
sleptLatch.countDown();
}
};
timedBuffer.start();
try {
timedBuffer.setObserver(new TestObserver());
int x = 0;
byte[] bytes = new byte[10];
for (int j = 0; j < 10; j++) {
bytes[j] = ActiveMQTestBase.getSamplebyte(x++);
}
ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes);
timedBuffer.checkSize(10);
timedBuffer.addBytes(buff, true, dummyCallback);
// waits all the sleeps to be done
sleptLatch.await(10, TimeUnit.SECONDS);
// keeps waiting a bit longer
Thread.sleep(100);
assertTrue(timedBuffer.isUseSleep());
} finally {
timedBuffer.stop();
}
}
} }