This commit is contained in:
Clebert Suconic 2020-07-23 15:45:10 -04:00
commit e3d44eef1b
8 changed files with 53 additions and 34 deletions

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.jboss.logging.Logger;
public abstract class AbstractSequentialFile implements SequentialFile {
@ -256,12 +257,11 @@ public abstract class AbstractSequentialFile implements SequentialFile {
return file;
}
protected ByteBuffer newBuffer(int size, int limit) {
size = factory.calculateBlockSize(size);
limit = factory.calculateBlockSize(limit);
protected ByteBuffer newBuffer(int requiredCapacity, boolean zeroed) {
final int alignedRequiredCapacity = factory.calculateBlockSize(requiredCapacity);
ByteBuffer buffer = factory.newBuffer(size);
buffer.limit(limit);
ByteBuffer buffer = factory.newBuffer(alignedRequiredCapacity, zeroed);
buffer.limit(alignedRequiredCapacity);
return buffer;
}
@ -271,9 +271,15 @@ public abstract class AbstractSequentialFile implements SequentialFile {
public void flushBuffer(final ByteBuf byteBuf, final boolean requestedSync, final List<IOCallback> callbacks) {
final int bytes = byteBuf.readableBytes();
if (bytes > 0) {
final ByteBuffer buffer = newBuffer(byteBuf.capacity(), bytes);
final ByteBuffer buffer = newBuffer(bytes, false);
final int alignedLimit = buffer.limit();
assert alignedLimit == factory.calculateBlockSize(bytes);
buffer.limit(bytes);
byteBuf.getBytes(byteBuf.readerIndex(), buffer);
final int missingNonZeroedBytes = alignedLimit - bytes;
if (missingNonZeroedBytes > 0) {
ByteUtil.zeros(buffer, bytes, missingNonZeroedBytes);
}
buffer.flip();
writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
} else {

View File

@ -74,6 +74,18 @@ public interface SequentialFileFactory {
*/
ByteBuffer newBuffer(int size);
/**
* Note: You need to release the buffer if is used for reading operations. You don't need to do
* it if using writing operations (AIO Buffer Lister will take of writing operations)
*
* @param size
* @param zeroed if {@code true} the returned {@link ByteBuffer} must be zeroed, otherwise it tries to save zeroing it.
* @return the allocated ByteBuffer
*/
default ByteBuffer newBuffer(int size, boolean zeroed) {
return newBuffer(size);
}
void releaseBuffer(ByteBuffer buffer);
void activateBuffer(SequentialFile file);

View File

@ -337,19 +337,6 @@ public class AIOSequentialFile extends AbstractSequentialFile {
return "AIOSequentialFile:" + getFile().getAbsolutePath();
}
// Protected methods
// -----------------------------------------------------------------------------------------------------
@Override
protected ByteBuffer newBuffer(int size, int limit) {
size = factory.calculateBlockSize(size);
limit = factory.calculateBlockSize(limit);
ByteBuffer buffer = factory.newBuffer(size);
buffer.limit(limit);
return buffer;
}
// Private methods
// -----------------------------------------------------------------------------------------------------

View File

@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -30,11 +31,12 @@ import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger;
@ -181,14 +183,25 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
@Override
public ByteBuffer newBuffer(int size) {
return newBuffer(size, true);
}
@Override
public ByteBuffer newBuffer(int size, boolean zeroed) {
final int alignedSize = calculateBlockSize(size);
return buffersControl.newBuffer(alignedSize, true);
return buffersControl.newBuffer(alignedSize, zeroed);
}
@Override
public void clearBuffer(final ByteBuffer directByteBuffer) {
directByteBuffer.position(0);
libaioContext.memsetBuffer(directByteBuffer);
if (PlatformDependent.hasUnsafe()) {
// that's the same semantic of libaioContext.memsetBuffer: it hasn't any JNI cost
ByteUtil.zeros(directByteBuffer, 0, directByteBuffer.limit());
} else {
// JNI cost
libaioContext.memsetBuffer(directByteBuffer);
}
}
@Override

View File

@ -100,10 +100,15 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac
@Override
public ByteBuffer newBuffer(int size) {
return newBuffer(size, true);
}
@Override
public ByteBuffer newBuffer(int size, boolean zeroed) {
if (!this.bufferPooling) {
return allocateDirectBuffer(size);
} else {
return bytesPool.borrow(size, true);
return bytesPool.borrow(size, zeroed);
}
}

View File

@ -261,7 +261,7 @@ final class TimedSequentialFile implements SequentialFile {
} else {
//perform the copy on buffer
releaseBuffer = true;
buffer = factory.newBuffer(byteBuf.capacity());
buffer = factory.newBuffer(byteBuf.capacity(), false);
buffer.limit(bytes);
byteBuf.getBytes(byteBuf.readerIndex(), buffer);
buffer.flip();

View File

@ -387,15 +387,6 @@ public class NIOSequentialFile extends AbstractSequentialFile {
internalWrite(bytes, sync, null, releaseBuffer);
}
@Override
protected ByteBuffer newBuffer(int size, final int limit) {
// For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO
size = limit;
return super.newBuffer(size, limit);
}
private void internalWrite(final ByteBuffer bytes,
final boolean sync,
final IOCallback callback,

View File

@ -139,10 +139,15 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
@Override
public ByteBuffer newBuffer(int size) {
return newBuffer(size, true);
}
@Override
public ByteBuffer newBuffer(int size, boolean zeroed) {
if (!this.bufferPooling) {
return allocateDirectBuffer(size);
} else {
return bytesPool.borrow(size, true);
return bytesPool.borrow(size, zeroed);
}
}