This commit is contained in:
Clebert Suconic 2017-10-09 14:32:48 -04:00
commit c94ca2d43a
30 changed files with 291 additions and 163 deletions

View File

@ -38,6 +38,7 @@ import java.util.regex.Pattern;
import io.airlift.airline.Arguments; import io.airlift.airline.Arguments;
import io.airlift.airline.Command; import io.airlift.airline.Command;
import io.airlift.airline.Option; import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.cli.CLIException; import org.apache.activemq.artemis.cli.CLIException;
import org.apache.activemq.artemis.cli.commands.util.HashUtil; import org.apache.activemq.artemis.cli.commands.util.HashUtil;
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
@ -723,6 +724,16 @@ public class Create extends InputAbstract {
} }
private void setupJournalType() { private void setupJournalType() {
if (noJournalSync && !mapped) {
boolean useMapped = inputBoolean("--mapped", "Since you disabled syncs, it is recommended to use the Mapped Memory Journal. Do you want to use the Memory Mapped Journal", true);
if (useMapped) {
mapped = true;
nio = false;
aio = false;
}
}
int countJournalTypes = countBoolean(aio, nio, mapped); int countJournalTypes = countBoolean(aio, nio, mapped);
if (countJournalTypes > 1) { if (countJournalTypes > 1) {
throw new RuntimeException("You can only select one journal type (--nio | --aio | --mapped)."); throw new RuntimeException("You can only select one journal type (--nio | --aio | --mapped).");
@ -803,7 +814,18 @@ public class Create extends InputAbstract {
System.out.println(""); System.out.println("");
System.out.println("Auto tuning journal ..."); System.out.println("Auto tuning journal ...");
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, journalType); if (mapped && noJournalSync) {
HashMap<String, String> syncFilter = new HashMap<>();
syncFilter.put("${nanoseconds}", "0");
syncFilter.put("${writesPerMillisecond}", "0");
syncFilter.put("${maxaio}", journalType == JournalType.ASYNCIO ? "" + ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio() : "1");
System.out.println("...Since you disabled sync and are using MAPPED journal, we are diabling buffer times");
filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter));
} else {
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, false, "journal-test.tmp", ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), journalType);
long nanoseconds = SyncCalculation.toNanos(time, writes, verbose); long nanoseconds = SyncCalculation.toNanos(time, writes, verbose);
double writesPerMillisecond = (double) writes / (double) time; double writesPerMillisecond = (double) writes / (double) time;
@ -812,11 +834,14 @@ public class Create extends InputAbstract {
HashMap<String, String> syncFilter = new HashMap<>(); HashMap<String, String> syncFilter = new HashMap<>();
syncFilter.put("${nanoseconds}", Long.toString(nanoseconds)); syncFilter.put("${nanoseconds}", Long.toString(nanoseconds));
syncFilter.put("${writesPerMillisecond}", writesPerMillisecondStr); syncFilter.put("${writesPerMillisecond}", writesPerMillisecondStr);
syncFilter.put("${maxaio}", journalType == JournalType.ASYNCIO ? "" + ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio() : "1");
System.out.println("done! Your system can make " + writesPerMillisecondStr + System.out.println("done! Your system can make " + writesPerMillisecondStr +
" writes per millisecond, your journal-buffer-timeout will be " + nanoseconds); " writes per millisecond, your journal-buffer-timeout will be " + nanoseconds);
filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter)); filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter));
}
} catch (Exception e) { } catch (Exception e) {
filters.put("${journal-buffer.settings}", ""); filters.put("${journal-buffer.settings}", "");

View File

@ -22,13 +22,13 @@ import java.text.DecimalFormat;
import io.airlift.airline.Command; import io.airlift.airline.Command;
import io.airlift.airline.Option; import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.tools.LockAbstract; import org.apache.activemq.artemis.cli.commands.tools.OptionalLocking;
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration; import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
@Command(name = "perf-journal", description = "Calculates the journal-buffer-timeout you should use with the current data folder") @Command(name = "perf-journal", description = "Calculates the journal-buffer-timeout you should use with the current data folder")
public class PerfJournal extends LockAbstract { public class PerfJournal extends OptionalLocking {
@Option(name = "--block-size", description = "The block size for each write (default 4096)") @Option(name = "--block-size", description = "The block size for each write (default 4096)")
@ -49,6 +49,15 @@ public class PerfJournal extends LockAbstract {
@Option(name = "--journal-type", description = "Journal Type to be used (default from broker.xml)") @Option(name = "--journal-type", description = "Journal Type to be used (default from broker.xml)")
public String journalType = null; public String journalType = null;
@Option(name = "--sync-writes", description = "It will perform each write synchronously, like if you had a single producer")
public boolean syncWrites = false;
@Option(name = "--file", description = "The file name to be used (default test.tmp)")
public String fileName = "test.tmp";
@Option(name = "--max-aio", description = "libaio.maxAIO to be used (default: configuration::getJournalMaxIO_AIO()")
public int maxAIO = 0;
@Override @Override
public Object execute(ActionContext context) throws Exception { public Object execute(ActionContext context) throws Exception {
@ -74,7 +83,11 @@ public class PerfJournal extends LockAbstract {
fileConfiguration.getJournalLocation().mkdirs(); fileConfiguration.getJournalLocation().mkdirs();
long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), fileConfiguration.getJournalType()); if (maxAIO <= 0) {
maxAIO = fileConfiguration.getJournalMaxIO_AIO();
}
long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), syncWrites, fileName, maxAIO, fileConfiguration.getJournalType());
long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose); long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose);
double writesPerMillisecond = (double) writes / (double) time; double writesPerMillisecond = (double) writes / (double) time;

View File

@ -50,13 +50,16 @@ public class SyncCalculation {
int tries, int tries,
boolean verbose, boolean verbose,
boolean fsync, boolean fsync,
boolean syncWrites,
String fileName,
int maxAIO,
JournalType journalType) throws Exception { JournalType journalType) throws Exception {
SequentialFileFactory factory = newFactory(datafolder, fsync, journalType, blockSize * blocks); SequentialFileFactory factory = newFactory(datafolder, fsync, journalType, blockSize * blocks, maxAIO);
if (verbose) { if (verbose) {
System.out.println("Using " + factory.getClass().getName() + " to calculate sync times"); System.out.println("Using " + factory.getClass().getName() + " to calculate sync times, alignment=" + factory.getAlignment());
} }
SequentialFile file = factory.createSequentialFile("test.tmp"); SequentialFile file = factory.createSequentialFile(fileName);
try { try {
file.delete(); file.delete();
@ -106,10 +109,14 @@ public class SyncCalculation {
bufferBlock.position(0); bufferBlock.position(0);
latch.countUp(); latch.countUp();
file.writeDirect(bufferBlock, true, callback); file.writeDirect(bufferBlock, true, callback);
if (!latch.await(5, TimeUnit.SECONDS)) {
throw new IOException("Callback wasn't called"); if (syncWrites) {
flushLatch(latch);
} }
} }
if (!syncWrites) flushLatch(latch);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
result[ntry] = (end - start); result[ntry] = (end - start);
@ -150,6 +157,12 @@ public class SyncCalculation {
} }
} }
private static void flushLatch(ReusableLatch latch) throws InterruptedException, IOException {
if (!latch.await(5, TimeUnit.SECONDS)) {
throw new IOException("Timed out on receiving IO callback");
}
}
public static long toNanos(long time, long blocks, boolean verbose) { public static long toNanos(long time, long blocks, boolean verbose) {
double blocksPerMillisecond = (double) blocks / (double) (time); double blocksPerMillisecond = (double) blocks / (double) (time);
@ -169,7 +182,7 @@ public class SyncCalculation {
return timeWait; return timeWait;
} }
private static SequentialFileFactory newFactory(File datafolder, boolean datasync, JournalType journalType, int fileSize) { private static SequentialFileFactory newFactory(File datafolder, boolean datasync, JournalType journalType, int fileSize, int maxAIO) {
SequentialFileFactory factory; SequentialFileFactory factory;
if (journalType == JournalType.ASYNCIO && !LibaioContext.isLoaded()) { if (journalType == JournalType.ASYNCIO && !LibaioContext.isLoaded()) {
@ -184,12 +197,12 @@ public class SyncCalculation {
factory.start(); factory.start();
return factory; return factory;
case ASYNCIO: case ASYNCIO:
factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync); factory = new AIOSequentialFileFactory(datafolder, maxAIO).setDatasync(datasync);
factory.start(); factory.start();
((AIOSequentialFileFactory) factory).disableBufferReuse(); ((AIOSequentialFileFactory) factory).disableBufferReuse();
return factory; return factory;
case MAPPED: case MAPPED:
factory = MappedSequentialFileFactory.unbuffered(datafolder, fileSize, null) factory = new MappedSequentialFileFactory(datafolder, fileSize, false, 0, 0, null)
.setDatasync(datasync) .setDatasync(datasync)
.disableBufferReuse(); .disableBufferReuse();
factory.start(); factory.start();

View File

@ -50,7 +50,8 @@ under the License.
<journal-pool-files>-1</journal-pool-files> <journal-pool-files>-1</journal-pool-files>
${ping-config.settings}${journal-buffer.settings}${connector-config.settings} <journal-file-size>10M</journal-file-size>
${journal-buffer.settings}${ping-config.settings}${connector-config.settings}
<!-- how often we are looking for how many bytes are being used on the disk in ms --> <!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period> <disk-scan-period>5000</disk-scan-period>

View File

@ -3,6 +3,15 @@
This value was determined through a calculation. This value was determined through a calculation.
Your system could perform ${writesPerMillisecond} writes per millisecond Your system could perform ${writesPerMillisecond} writes per millisecond
on the current journal configuration. on the current journal configuration.
That translates as a sync write every ${nanoseconds} nanoseconds That translates as a sync write every ${nanoseconds} nanoseconds.
Note: If you specify 0 the system will perform writes directly to the disk.
We recommend this to be 0 if you are using journalType=MAPPED and ournal-datasync=false.
--> -->
<journal-buffer-timeout>${nanoseconds}</journal-buffer-timeout> <journal-buffer-timeout>${nanoseconds}</journal-buffer-timeout>
<!--
When using ASYNCIO, this will determine the writing queue depth for libaio.
-->
<journal-max-io>${maxaio}</journal-max-io>

View File

@ -1,3 +1,4 @@
<!-- <!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element. You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC> <network-check-NIC>theNicName</network-check-NIC>

View File

@ -123,7 +123,7 @@ public class ArtemisTest extends CliTestBase {
public void testSync() throws Exception { public void testSync() throws Exception {
int writes = 2; int writes = 2;
int tries = 5; int tries = 5;
long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, JournalType.NIO); long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true, "file.tmp", 1, JournalType.NIO);
System.out.println(); System.out.println();
System.out.println("TotalAvg = " + totalAvg); System.out.println("TotalAvg = " + totalAvg);
long nanoTime = SyncCalculation.toNanos(totalAvg, writes, false); long nanoTime = SyncCalculation.toNanos(totalAvg, writes, false);

View File

@ -103,7 +103,7 @@ public final class ActiveMQDefaultConfiguration {
// These defaults are applied depending on whether the journal type // These defaults are applied depending on whether the journal type
// is NIO or AIO. // is NIO or AIO.
private static int DEFAULT_JOURNAL_MAX_IO_AIO = 500; private static int DEFAULT_JOURNAL_MAX_IO_AIO = 4096;
private static int DEFAULT_JOURNAL_POOL_FILES = -1; private static int DEFAULT_JOURNAL_POOL_FILES = -1;
private static int DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO; private static int DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO;
private static int DEFAULT_JOURNAL_BUFFER_SIZE_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO; private static int DEFAULT_JOURNAL_BUFFER_SIZE_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO;

View File

@ -163,7 +163,7 @@ public final class ActiveMQClient {
} }
if (globalThreadPool != null) { if (globalThreadPool != null) {
globalThreadPool.shutdown(); globalThreadPool.shutdownNow();
try { try {
if (!globalThreadPool.awaitTermination(time, unit)) { if (!globalThreadPool.awaitTermination(time, unit)) {
globalThreadPool.shutdownNow(); globalThreadPool.shutdownNow();
@ -177,7 +177,7 @@ public final class ActiveMQClient {
} }
if (globalScheduledThreadPool != null) { if (globalScheduledThreadPool != null) {
globalScheduledThreadPool.shutdown(); globalScheduledThreadPool.shutdownNow();
try { try {
if (!globalScheduledThreadPool.awaitTermination(time, unit)) { if (!globalScheduledThreadPool.awaitTermination(time, unit)) {
globalScheduledThreadPool.shutdownNow(); globalScheduledThreadPool.shutdownNow();

View File

@ -59,7 +59,7 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
protected volatile int alignment = -1; protected volatile int alignment = -1;
private final IOCriticalErrorListener critialErrorListener; protected final IOCriticalErrorListener critialErrorListener;
/** /**
* Asynchronous writes need to be done at another executor. * Asynchronous writes need to be done at another executor.
@ -88,6 +88,11 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
this.maxIO = maxIO; this.maxIO = maxIO;
} }
@Override
public long getBufferSize() {
return bufferSize;
}
@Override @Override
public int getAlignment() { public int getAlignment() {
if (alignment < 0) { if (alignment < 0) {

View File

@ -269,10 +269,6 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
} }
} }
@Override
public long getBufferSize() {
return bufferSize;
}
/** /**
* The same callback is used for Runnable executor. * The same callback is used for Runnable executor.
@ -416,6 +412,16 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
private boolean stopped = false; private boolean stopped = false;
private int alignedBufferSize = 0;
private int getAlignedBufferSize() {
if (alignedBufferSize == 0) {
alignedBufferSize = calculateBlockSize(bufferSize);
}
return alignedBufferSize;
}
public ByteBuffer newBuffer(final int size) { public ByteBuffer newBuffer(final int size) {
// if a new buffer wasn't requested in 10 seconds, we clear the queue // if a new buffer wasn't requested in 10 seconds, we clear the queue
// This is being done this way as we don't need another Timeout Thread // This is being done this way as we don't need another Timeout Thread
@ -433,26 +439,32 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
// if a buffer is bigger than the configured-bufferSize, we just create a new // if a buffer is bigger than the configured-bufferSize, we just create a new
// buffer. // buffer.
if (size > bufferSize) { if (size > getAlignedBufferSize()) {
return LibaioContext.newAlignedBuffer(size, getAlignment()); return LibaioContext.newAlignedBuffer(size, getAlignment());
} else { } else {
// We need to allocate buffers following the rules of the storage // We need to allocate buffers following the rules of the storage
// being used (AIO/NIO) // being used (AIO/NIO)
int alignedSize = calculateBlockSize(size); final int alignedSize;
if (size < getAlignedBufferSize()) {
alignedSize = getAlignedBufferSize();
} else {
alignedSize = calculateBlockSize(size);
}
// Try getting a buffer from the queue... // Try getting a buffer from the queue...
ByteBuffer buffer = reuseBuffersQueue.poll(); ByteBuffer buffer = reuseBuffersQueue.poll();
if (buffer == null) { if (buffer == null) {
// if empty create a new one. // if empty create a new one.
buffer = LibaioContext.newAlignedBuffer(size, getAlignment()); buffer = LibaioContext.newAlignedBuffer(alignedSize, getAlignment());
buffer.limit(alignedSize); buffer.limit(calculateBlockSize(size));
} else { } else {
clearBuffer(buffer); clearBuffer(buffer);
// set the limit of the buffer to the bufferSize being required // set the limit of the buffer to the bufferSize being required
buffer.limit(alignedSize); buffer.limit(calculateBlockSize(size));
} }
buffer.rewind(); buffer.rewind();
@ -484,7 +496,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
// If a buffer has any other than the configured bufferSize, the buffer // If a buffer has any other than the configured bufferSize, the buffer
// will be just sent to GC // will be just sent to GC
if (buffer.capacity() == bufferSize) { if (buffer.capacity() == getAlignedBufferSize()) {
reuseBuffersQueue.offer(buffer); reuseBuffersQueue.offer(buffer);
} else { } else {
releaseBuffer(buffer); releaseBuffer(buffer);

View File

@ -17,48 +17,33 @@
package org.apache.activemq.artemis.core.io.mapped; package org.apache.activemq.artemis.core.io.mapped;
import java.io.File; import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.Env;
public final class MappedSequentialFileFactory implements SequentialFileFactory { public final class MappedSequentialFileFactory extends AbstractSequentialFileFactory {
private final File directory;
private int capacity; private int capacity;
private final IOCriticalErrorListener criticalErrorListener;
private final TimedBuffer timedBuffer;
private boolean useDataSync;
private boolean bufferPooling; private boolean bufferPooling;
//pools only the biggest one -> optimized for the common case //pools only the biggest one -> optimized for the common case
private final ThreadLocal<ByteBuffer> bytesPool; private final ThreadLocal<ByteBuffer> bytesPool;
private final int bufferSize;
private MappedSequentialFileFactory(File directory, public MappedSequentialFileFactory(File directory,
int capacity, int capacity,
final boolean buffered, final boolean buffered,
final int bufferSize, final int bufferSize,
final int bufferTimeout, final int bufferTimeout,
IOCriticalErrorListener criticalErrorListener) { IOCriticalErrorListener criticalErrorListener) {
this.directory = directory;
super(directory, buffered, bufferSize, bufferTimeout, 1, false, criticalErrorListener);
this.capacity = capacity; this.capacity = capacity;
this.criticalErrorListener = criticalErrorListener; this.setDatasync(true);
this.useDataSync = true;
if (buffered && bufferTimeout > 0 && bufferSize > 0) {
timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, false);
} else {
timedBuffer = null;
}
this.bufferSize = bufferSize;
this.bufferPooling = true; this.bufferPooling = true;
this.bytesPool = new ThreadLocal<>(); this.bytesPool = new ThreadLocal<>();
} }
@ -72,23 +57,9 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
return capacity; return capacity;
} }
public static MappedSequentialFileFactory buffered(File directory,
int capacity,
final int bufferSize,
final int bufferTimeout,
IOCriticalErrorListener criticalErrorListener) {
return new MappedSequentialFileFactory(directory, capacity, true, bufferSize, bufferTimeout, criticalErrorListener);
}
public static MappedSequentialFileFactory unbuffered(File directory,
int capacity,
IOCriticalErrorListener criticalErrorListener) {
return new MappedSequentialFileFactory(directory, capacity, false, 0, 0, criticalErrorListener);
}
@Override @Override
public SequentialFile createSequentialFile(String fileName) { public SequentialFile createSequentialFile(String fileName) {
final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), capacity, criticalErrorListener); final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, journalDir, new File(journalDir, fileName), capacity, critialErrorListener);
if (this.timedBuffer == null) { if (this.timedBuffer == null) {
return mappedSequentialFile; return mappedSequentialFile;
} else { } else {
@ -96,49 +67,11 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
} }
} }
@Override
public MappedSequentialFileFactory setDatasync(boolean enabled) {
this.useDataSync = enabled;
return this;
}
@Override
public boolean isDatasync() {
return useDataSync;
}
@Override
public long getBufferSize() {
return bufferSize;
}
@Override
public int getMaxIO() {
return 1;
}
@Override
public List<String> listFiles(final String extension) throws Exception {
final FilenameFilter extensionFilter = (file, name) -> name.endsWith("." + extension);
final String[] fileNames = directory.list(extensionFilter);
if (fileNames == null) {
return Collections.EMPTY_LIST;
}
return Arrays.asList(fileNames);
}
@Override @Override
public boolean isSupportsCallbacks() { public boolean isSupportsCallbacks() {
return timedBuffer != null; return timedBuffer != null;
} }
@Override
public void onIOError(Exception exception, String message, SequentialFile file) {
if (criticalErrorListener != null) {
criticalErrorListener.onIOException(exception, message, file);
}
}
@Override @Override
public ByteBuffer allocateDirectBuffer(final int size) { public ByteBuffer allocateDirectBuffer(final int size) {
final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize()); final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize());
@ -204,20 +137,11 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
} }
@Override @Override
public void activateBuffer(SequentialFile file) { public MappedSequentialFileFactory setDatasync(boolean enabled) {
if (timedBuffer != null) { super.setDatasync(enabled);
file.setTimedBuffer(timedBuffer); return this;
}
} }
@Override
public void deactivateBuffer() {
if (timedBuffer != null) {
// When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer
timedBuffer.flush();
timedBuffer.setObserver(null);
}
}
@Override @Override
public ByteBuffer wrapBuffer(final byte[] bytes) { public ByteBuffer wrapBuffer(final byte[] bytes) {
@ -240,11 +164,6 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
return bytes; return bytes;
} }
@Override
public File getDirectory() {
return this.directory;
}
@Override @Override
public void clearBuffer(final ByteBuffer buffer) { public void clearBuffer(final ByteBuffer buffer) {
if (buffer.isDirect()) { if (buffer.isDirect()) {
@ -276,18 +195,4 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
} }
} }
@Override
public void createDirs() throws Exception {
boolean ok = directory.mkdirs();
if (!ok) {
throw new IOException("Failed to create directory " + directory);
}
}
@Override
public void flush() {
if (timedBuffer != null) {
timedBuffer.flush();
}
}
} }

View File

@ -203,9 +203,4 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor
return bytes; return bytes;
} }
@Override
public long getBufferSize() {
return bufferSize;
}
} }

View File

@ -2207,8 +2207,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
*/ */
@Override @Override
public long getMaxRecordSize() { public long getMaxRecordSize() {
if (fileFactory.getBufferSize() == 0) {
return getFileSize();
} else {
return Math.min(getFileSize(), fileFactory.getBufferSize()); return Math.min(getFileSize(), fileFactory.getBufferSize());
} }
}
private void flushExecutor(Executor executor) throws InterruptedException { private void flushExecutor(Executor executor) throws InterruptedException {

View File

@ -62,7 +62,7 @@ public class JournalTptBenchmark {
switch (type) { switch (type) {
case Mapped: case Mapped:
factory = MappedSequentialFileFactory.buffered(tmpDirectory, fileSize, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null) factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null)
.setDatasync(dataSync); .setDatasync(dataSync);
break; break;
case Nio: case Nio:

View File

@ -56,7 +56,7 @@ public class SequentialFileTptBenchmark {
case Mapped: case Mapped:
final int fileSize = Math.max(msgSize * measurements, msgSize * warmup); final int fileSize = Math.max(msgSize * measurements, msgSize * warmup);
factory = MappedSequentialFileFactory.buffered(tmpDirectory, fileSize, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync); factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync);
break; break;
case Nio: case Nio:
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync); factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);

View File

@ -551,7 +551,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setJournalFileSize(getTextBytesAsIntBytes(e, "journal-file-size", config.getJournalFileSize(), Validators.GT_ZERO)); config.setJournalFileSize(getTextBytesAsIntBytes(e, "journal-file-size", config.getJournalFileSize(), Validators.GT_ZERO));
int journalBufferTimeout = getInteger(e, "journal-buffer-timeout", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, Validators.GT_ZERO); int journalBufferTimeout = getInteger(e, "journal-buffer-timeout", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, Validators.GE_ZERO);
int journalBufferSize = getTextBytesAsIntBytes(e, "journal-buffer-size", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, Validators.GT_ZERO); int journalBufferSize = getTextBytesAsIntBytes(e, "journal-buffer-size", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, Validators.GT_ZERO);

View File

@ -65,6 +65,11 @@ import org.apache.activemq.artemis.utils.IDGenerator;
*/ */
public interface StorageManager extends IDGenerator, ActiveMQComponent { public interface StorageManager extends IDGenerator, ActiveMQComponent {
default long getMaxRecordSize() {
/** Null journal is pretty much memory */
return Long.MAX_VALUE;
}
void criticalError(Throwable error); void criticalError(Throwable error);
/** /**

View File

@ -228,6 +228,13 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, this); idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, this);
} }
@Override
public long getMaxRecordSize() {
return messageJournal.getMaxRecordSize();
}
/** /**
* Called during initialization. Used by implementations to setup Journals, Stores etc... * Called during initialization. Used by implementations to setup Journals, Stores etc...
* *

View File

@ -141,7 +141,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
break; break;
case MAPPED: case MAPPED:
ActiveMQServerLogger.LOGGER.journalUseMAPPED(); ActiveMQServerLogger.LOGGER.journalUseMAPPED();
journalFF = MappedSequentialFileFactory.buffered(config.getJournalLocation(), config.getJournalFileSize(), config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), criticalErrorListener); journalFF = new MappedSequentialFileFactory(config.getJournalLocation(), config.getJournalFileSize(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), criticalErrorListener);
break; break;
default: default:
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType()); throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());

View File

@ -101,7 +101,7 @@ public class InVMConnector extends AbstractConnector {
public static synchronized void resetThreadPool() { public static synchronized void resetThreadPool() {
if (threadPoolExecutor != null) { if (threadPoolExecutor != null) {
threadPoolExecutor.shutdown(); threadPoolExecutor.shutdownNow();
threadPoolExecutor = null; threadPoolExecutor = null;
} }
} }

View File

@ -1333,7 +1333,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
boolean noAutoCreateQueue) throws Exception { boolean noAutoCreateQueue) throws Exception {
final Message message; final Message message;
if ((msg.getEncodeSize() > storageManager.getMessageJournal().getMaxRecordSize()) && !msg.isLargeMessage()) { if ((msg.getEncodeSize() > storageManager.getMaxRecordSize()) && !msg.isLargeMessage()) {
message = messageToLargeMessage(msg); message = messageToLargeMessage(msg);
} else { } else {
message = msg; message = msg;

View File

@ -74,7 +74,7 @@ Name | Description
[journal-compact-percentage](persistence.md) | The percentage of live data on which we consider compacting the journal. Default=30 [journal-compact-percentage](persistence.md) | The percentage of live data on which we consider compacting the journal. Default=30
[journal-directory](persistence.md) | the directory to store the journal files in. Default=data/journal [journal-directory](persistence.md) | the directory to store the journal files in. Default=data/journal
[journal-file-size](persistence.md) | the size (in bytes) of each journal file. Default=10485760 (10 MB) [journal-file-size](persistence.md) | the size (in bytes) of each journal file. Default=10485760 (10 MB)
[journal-max-io](persistence.md#configuring.message.journal.journal-max-io) | the maximum number of write requests that can be in the AIO queue at any one time. Default is 500 for AIO and 1 for NIO, ignored for MAPPED. [journal-max-io](persistence.md#configuring.message.journal.journal-max-io) | the maximum number of write requests that can be in the AIO queue at any one time. Default is 4096 for AIO and 1 for NIO, ignored for MAPPED.
[journal-min-files](persistence.md#configuring.message.journal.journal-min-files) | how many journal files to pre-create. Default=2 [journal-min-files](persistence.md#configuring.message.journal.journal-min-files) | how many journal files to pre-create. Default=2
[journal-pool-files](persistence.md#configuring.message.journal.journal-pool-files) | The upper theshold of the journal file pool,-1 (default) means no Limit. The system will create as many files as needed however when reclaiming files it will shrink back to the `journal-pool-files` [journal-pool-files](persistence.md#configuring.message.journal.journal-pool-files) | The upper theshold of the journal file pool,-1 (default) means no Limit. The system will create as many files as needed however when reclaiming files it will shrink back to the `journal-pool-files`
[journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true [journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.journal;
import java.io.File;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
/**
* A RealJournalImplTest
* you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
* If you are running this test in eclipse you should do:
* I - Run->Open Run Dialog
* II - Find the class on the list (you will find it if you already tried running this testcase before)
* III - Add -Djava.library.path=<your project place>/native/src/.libs
*/
public class AIOUnbuferedJournalImplTest extends JournalImplTestUnit {
@BeforeClass
public static void hasAIO() {
org.junit.Assume.assumeTrue("Test case needs AIO to run", AIOSequentialFileFactory.isSupported());
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
if (!LibaioContext.isLoaded()) {
Assert.fail(String.format("libAIO is not loaded on %s %s %s", System.getProperty("os.name"), System.getProperty("os.arch"), System.getProperty("os.version")));
}
}
@Override
protected SequentialFileFactory getFileFactory() throws Exception {
File file = new File(getTestDir());
deleteDirectory(file);
file.mkdir();
// forcing the alignment to be 512, as this test was hard coded around this size.
return new AIOSequentialFileFactory(getTestDirfile(), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 0, 10, false).setAlignment(512);
}
@Override
protected int getAlignment() {
return 512;
}
}

View File

@ -42,7 +42,7 @@ public class MappedImportExportTest extends NIOImportExportTest {
@Override @Override
protected SequentialFileFactory getFileFactory() throws Exception { protected SequentialFileFactory getFileFactory() throws Exception {
return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 10 * 4096, null); return new MappedSequentialFileFactory(getTestDirfile(), 10 * 4096, false, 0, 0, null);
} }
} }

View File

@ -50,6 +50,6 @@ public class MappedJournalCompactTest extends NIOJournalCompactTest {
file.mkdir(); file.mkdir();
return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 60 * 1024, null); return new MappedSequentialFileFactory(getTestDirfile(), 60 * 1024, false, 0, 0, null);
} }
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.journal;
import java.io.File; import java.io.File;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit; import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
@ -50,7 +51,7 @@ public class MappedJournalImplTest extends JournalImplTestUnit {
file.mkdir(); file.mkdir();
return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 10 * 1024, null); return new MappedSequentialFileFactory(getTestDirfile(), 10 * 1024, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, null);
} }
@Override @Override

View File

@ -34,7 +34,7 @@ public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBa
@Override @Override
protected SequentialFileFactory createFactory(String folder) { protected SequentialFileFactory createFactory(String folder) {
return MappedSequentialFileFactory.unbuffered(new File(folder), 2048, null); return new MappedSequentialFileFactory(new File(folder), 2048, false, 0, 0, null);
} }
@Test @Test
@ -58,7 +58,7 @@ public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBa
}; };
final AtomicInteger calls = new AtomicInteger(0); final AtomicInteger calls = new AtomicInteger(0);
final MappedSequentialFileFactory factory = MappedSequentialFileFactory.unbuffered(new File(getTestDir()), fakeEncoding.getEncodeSize(), (code, message, file) -> { final MappedSequentialFileFactory factory = new MappedSequentialFileFactory(new File(getTestDir()), fakeEncoding.getEncodeSize(), false, 0, 0, (code, message, file) -> {
new Exception("shutdown").printStackTrace(); new Exception("shutdown").printStackTrace();
calls.incrementAndGet(); calls.incrementAndGet();
}); });

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.journal;
import java.io.File;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
public class MappedUnbuferedJournalImplTest extends JournalImplTestUnit {
@Override
protected void setup(int minFreeFiles, int fileSize, boolean sync) {
super.setup(minFreeFiles, fileSize, sync);
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
}
@Override
protected void setup(int minFreeFiles, int fileSize, boolean sync, int maxAIO) {
super.setup(minFreeFiles, fileSize, sync, maxAIO);
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
}
@Override
protected void setup(int minFreeFiles, int poolSize, int fileSize, boolean sync, int maxAIO) {
super.setup(minFreeFiles, poolSize, fileSize, sync, maxAIO);
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
}
@Override
protected SequentialFileFactory getFileFactory() throws Exception {
File file = new File(getTestDir());
deleteDirectory(file);
file.mkdir();
return new MappedSequentialFileFactory(getTestDirfile(), 10 * 1024, false, 0, 0, null);
}
@Override
protected int getAlignment() {
return fileFactory.getAlignment();
}
}

View File

@ -352,7 +352,7 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
} else if (factoryType.equals("nio2")) { } else if (factoryType.equals("nio2")) {
return new NIOSequentialFileFactory(new File(directory), true, 1); return new NIOSequentialFileFactory(new File(directory), true, 1);
} else if (factoryType.equals("mmap")) { } else if (factoryType.equals("mmap")) {
return MappedSequentialFileFactory.unbuffered(new File(directory), fileSize, null); return new MappedSequentialFileFactory(new File(directory), fileSize, false, 0, 0, null);
} else { } else {
return new NIOSequentialFileFactory(new File(directory), false, 1); return new NIOSequentialFileFactory(new File(directory), false, 1);
} }