ARTEMIS-1452 Improvements to IO parameters and options
- it is now possible to disable the TimedBuffer - this is increasing the default on libaio maxAIO to 4k - The Auto Tuning on the journal will use asynchronous writes to simulate what would happen on faster disks - If you set datasync=false on the CLI, the system will suggest mapped and disable the buffer timeout This closes #1436 This commit superseeds #1436 since it's now disabling the timed buffer through the CLI
This commit is contained in:
parent
ba1323c8b2
commit
d190b611be
|
@ -38,6 +38,7 @@ import java.util.regex.Pattern;
|
||||||
import io.airlift.airline.Arguments;
|
import io.airlift.airline.Arguments;
|
||||||
import io.airlift.airline.Command;
|
import io.airlift.airline.Command;
|
||||||
import io.airlift.airline.Option;
|
import io.airlift.airline.Option;
|
||||||
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.cli.CLIException;
|
import org.apache.activemq.artemis.cli.CLIException;
|
||||||
import org.apache.activemq.artemis.cli.commands.util.HashUtil;
|
import org.apache.activemq.artemis.cli.commands.util.HashUtil;
|
||||||
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
|
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
|
||||||
|
@ -723,6 +724,16 @@ public class Create extends InputAbstract {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupJournalType() {
|
private void setupJournalType() {
|
||||||
|
|
||||||
|
if (noJournalSync && !mapped) {
|
||||||
|
boolean useMapped = inputBoolean("--mapped", "Since you disabled syncs, it is recommended to use the Mapped Memory Journal. Do you want to use the Memory Mapped Journal", true);
|
||||||
|
|
||||||
|
if (useMapped) {
|
||||||
|
mapped = true;
|
||||||
|
nio = false;
|
||||||
|
aio = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
int countJournalTypes = countBoolean(aio, nio, mapped);
|
int countJournalTypes = countBoolean(aio, nio, mapped);
|
||||||
if (countJournalTypes > 1) {
|
if (countJournalTypes > 1) {
|
||||||
throw new RuntimeException("You can only select one journal type (--nio | --aio | --mapped).");
|
throw new RuntimeException("You can only select one journal type (--nio | --aio | --mapped).");
|
||||||
|
@ -803,20 +814,34 @@ public class Create extends InputAbstract {
|
||||||
System.out.println("");
|
System.out.println("");
|
||||||
System.out.println("Auto tuning journal ...");
|
System.out.println("Auto tuning journal ...");
|
||||||
|
|
||||||
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, journalType);
|
if (mapped && noJournalSync) {
|
||||||
long nanoseconds = SyncCalculation.toNanos(time, writes, verbose);
|
HashMap<String, String> syncFilter = new HashMap<>();
|
||||||
double writesPerMillisecond = (double) writes / (double) time;
|
syncFilter.put("${nanoseconds}", "0");
|
||||||
|
syncFilter.put("${writesPerMillisecond}", "0");
|
||||||
|
syncFilter.put("${maxaio}", journalType == JournalType.ASYNCIO ? "" + ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio() : "1");
|
||||||
|
|
||||||
String writesPerMillisecondStr = new DecimalFormat("###.##").format(writesPerMillisecond);
|
System.out.println("...Since you disabled sync and are using MAPPED journal, we are diabling buffer times");
|
||||||
|
|
||||||
HashMap<String, String> syncFilter = new HashMap<>();
|
filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter));
|
||||||
syncFilter.put("${nanoseconds}", Long.toString(nanoseconds));
|
|
||||||
syncFilter.put("${writesPerMillisecond}", writesPerMillisecondStr);
|
|
||||||
|
|
||||||
System.out.println("done! Your system can make " + writesPerMillisecondStr +
|
} else {
|
||||||
" writes per millisecond, your journal-buffer-timeout will be " + nanoseconds);
|
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, false, "journal-test.tmp", ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), journalType);
|
||||||
|
long nanoseconds = SyncCalculation.toNanos(time, writes, verbose);
|
||||||
|
double writesPerMillisecond = (double) writes / (double) time;
|
||||||
|
|
||||||
|
String writesPerMillisecondStr = new DecimalFormat("###.##").format(writesPerMillisecond);
|
||||||
|
|
||||||
|
HashMap<String, String> syncFilter = new HashMap<>();
|
||||||
|
syncFilter.put("${nanoseconds}", Long.toString(nanoseconds));
|
||||||
|
syncFilter.put("${writesPerMillisecond}", writesPerMillisecondStr);
|
||||||
|
syncFilter.put("${maxaio}", journalType == JournalType.ASYNCIO ? "" + ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio() : "1");
|
||||||
|
|
||||||
|
System.out.println("done! Your system can make " + writesPerMillisecondStr +
|
||||||
|
" writes per millisecond, your journal-buffer-timeout will be " + nanoseconds);
|
||||||
|
|
||||||
|
filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter));
|
||||||
|
}
|
||||||
|
|
||||||
filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter));
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
filters.put("${journal-buffer.settings}", "");
|
filters.put("${journal-buffer.settings}", "");
|
||||||
|
|
|
@ -22,13 +22,13 @@ import java.text.DecimalFormat;
|
||||||
import io.airlift.airline.Command;
|
import io.airlift.airline.Command;
|
||||||
import io.airlift.airline.Option;
|
import io.airlift.airline.Option;
|
||||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||||
import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
|
import org.apache.activemq.artemis.cli.commands.tools.OptionalLocking;
|
||||||
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
|
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
|
||||||
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
|
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
|
|
||||||
@Command(name = "perf-journal", description = "Calculates the journal-buffer-timeout you should use with the current data folder")
|
@Command(name = "perf-journal", description = "Calculates the journal-buffer-timeout you should use with the current data folder")
|
||||||
public class PerfJournal extends LockAbstract {
|
public class PerfJournal extends OptionalLocking {
|
||||||
|
|
||||||
|
|
||||||
@Option(name = "--block-size", description = "The block size for each write (default 4096)")
|
@Option(name = "--block-size", description = "The block size for each write (default 4096)")
|
||||||
|
@ -49,6 +49,15 @@ public class PerfJournal extends LockAbstract {
|
||||||
@Option(name = "--journal-type", description = "Journal Type to be used (default from broker.xml)")
|
@Option(name = "--journal-type", description = "Journal Type to be used (default from broker.xml)")
|
||||||
public String journalType = null;
|
public String journalType = null;
|
||||||
|
|
||||||
|
@Option(name = "--sync-writes", description = "It will perform each write synchronously, like if you had a single producer")
|
||||||
|
public boolean syncWrites = false;
|
||||||
|
|
||||||
|
@Option(name = "--file", description = "The file name to be used (default test.tmp)")
|
||||||
|
public String fileName = "test.tmp";
|
||||||
|
|
||||||
|
@Option(name = "--max-aio", description = "libaio.maxAIO to be used (default: configuration::getJournalMaxIO_AIO()")
|
||||||
|
public int maxAIO = 0;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object execute(ActionContext context) throws Exception {
|
public Object execute(ActionContext context) throws Exception {
|
||||||
|
@ -74,7 +83,11 @@ public class PerfJournal extends LockAbstract {
|
||||||
|
|
||||||
fileConfiguration.getJournalLocation().mkdirs();
|
fileConfiguration.getJournalLocation().mkdirs();
|
||||||
|
|
||||||
long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), fileConfiguration.getJournalType());
|
if (maxAIO <= 0) {
|
||||||
|
maxAIO = fileConfiguration.getJournalMaxIO_AIO();
|
||||||
|
}
|
||||||
|
|
||||||
|
long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), syncWrites, fileName, maxAIO, fileConfiguration.getJournalType());
|
||||||
|
|
||||||
long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose);
|
long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose);
|
||||||
double writesPerMillisecond = (double) writes / (double) time;
|
double writesPerMillisecond = (double) writes / (double) time;
|
||||||
|
|
|
@ -50,13 +50,16 @@ public class SyncCalculation {
|
||||||
int tries,
|
int tries,
|
||||||
boolean verbose,
|
boolean verbose,
|
||||||
boolean fsync,
|
boolean fsync,
|
||||||
|
boolean syncWrites,
|
||||||
|
String fileName,
|
||||||
|
int maxAIO,
|
||||||
JournalType journalType) throws Exception {
|
JournalType journalType) throws Exception {
|
||||||
SequentialFileFactory factory = newFactory(datafolder, fsync, journalType, blockSize * blocks);
|
SequentialFileFactory factory = newFactory(datafolder, fsync, journalType, blockSize * blocks, maxAIO);
|
||||||
|
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
System.out.println("Using " + factory.getClass().getName() + " to calculate sync times");
|
System.out.println("Using " + factory.getClass().getName() + " to calculate sync times, alignment=" + factory.getAlignment());
|
||||||
}
|
}
|
||||||
SequentialFile file = factory.createSequentialFile("test.tmp");
|
SequentialFile file = factory.createSequentialFile(fileName);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
file.delete();
|
file.delete();
|
||||||
|
@ -106,10 +109,14 @@ public class SyncCalculation {
|
||||||
bufferBlock.position(0);
|
bufferBlock.position(0);
|
||||||
latch.countUp();
|
latch.countUp();
|
||||||
file.writeDirect(bufferBlock, true, callback);
|
file.writeDirect(bufferBlock, true, callback);
|
||||||
if (!latch.await(5, TimeUnit.SECONDS)) {
|
|
||||||
throw new IOException("Callback wasn't called");
|
if (syncWrites) {
|
||||||
|
flushLatch(latch);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!syncWrites) flushLatch(latch);
|
||||||
|
|
||||||
long end = System.currentTimeMillis();
|
long end = System.currentTimeMillis();
|
||||||
|
|
||||||
result[ntry] = (end - start);
|
result[ntry] = (end - start);
|
||||||
|
@ -150,6 +157,12 @@ public class SyncCalculation {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void flushLatch(ReusableLatch latch) throws InterruptedException, IOException {
|
||||||
|
if (!latch.await(5, TimeUnit.SECONDS)) {
|
||||||
|
throw new IOException("Timed out on receiving IO callback");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static long toNanos(long time, long blocks, boolean verbose) {
|
public static long toNanos(long time, long blocks, boolean verbose) {
|
||||||
|
|
||||||
double blocksPerMillisecond = (double) blocks / (double) (time);
|
double blocksPerMillisecond = (double) blocks / (double) (time);
|
||||||
|
@ -169,7 +182,7 @@ public class SyncCalculation {
|
||||||
return timeWait;
|
return timeWait;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static SequentialFileFactory newFactory(File datafolder, boolean datasync, JournalType journalType, int fileSize) {
|
private static SequentialFileFactory newFactory(File datafolder, boolean datasync, JournalType journalType, int fileSize, int maxAIO) {
|
||||||
SequentialFileFactory factory;
|
SequentialFileFactory factory;
|
||||||
|
|
||||||
if (journalType == JournalType.ASYNCIO && !LibaioContext.isLoaded()) {
|
if (journalType == JournalType.ASYNCIO && !LibaioContext.isLoaded()) {
|
||||||
|
@ -184,12 +197,12 @@ public class SyncCalculation {
|
||||||
factory.start();
|
factory.start();
|
||||||
return factory;
|
return factory;
|
||||||
case ASYNCIO:
|
case ASYNCIO:
|
||||||
factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
|
factory = new AIOSequentialFileFactory(datafolder, maxAIO).setDatasync(datasync);
|
||||||
factory.start();
|
factory.start();
|
||||||
((AIOSequentialFileFactory) factory).disableBufferReuse();
|
((AIOSequentialFileFactory) factory).disableBufferReuse();
|
||||||
return factory;
|
return factory;
|
||||||
case MAPPED:
|
case MAPPED:
|
||||||
factory = MappedSequentialFileFactory.unbuffered(datafolder, fileSize, null)
|
factory = new MappedSequentialFileFactory(datafolder, fileSize, false, 0, 0, null)
|
||||||
.setDatasync(datasync)
|
.setDatasync(datasync)
|
||||||
.disableBufferReuse();
|
.disableBufferReuse();
|
||||||
factory.start();
|
factory.start();
|
||||||
|
|
|
@ -50,7 +50,8 @@ under the License.
|
||||||
|
|
||||||
<journal-pool-files>-1</journal-pool-files>
|
<journal-pool-files>-1</journal-pool-files>
|
||||||
|
|
||||||
${ping-config.settings}${journal-buffer.settings}${connector-config.settings}
|
<journal-file-size>10M</journal-file-size>
|
||||||
|
${journal-buffer.settings}${ping-config.settings}${connector-config.settings}
|
||||||
|
|
||||||
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
|
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
|
||||||
<disk-scan-period>5000</disk-scan-period>
|
<disk-scan-period>5000</disk-scan-period>
|
||||||
|
|
|
@ -3,6 +3,15 @@
|
||||||
This value was determined through a calculation.
|
This value was determined through a calculation.
|
||||||
Your system could perform ${writesPerMillisecond} writes per millisecond
|
Your system could perform ${writesPerMillisecond} writes per millisecond
|
||||||
on the current journal configuration.
|
on the current journal configuration.
|
||||||
That translates as a sync write every ${nanoseconds} nanoseconds
|
That translates as a sync write every ${nanoseconds} nanoseconds.
|
||||||
|
|
||||||
|
Note: If you specify 0 the system will perform writes directly to the disk.
|
||||||
|
We recommend this to be 0 if you are using journalType=MAPPED and ournal-datasync=false.
|
||||||
-->
|
-->
|
||||||
<journal-buffer-timeout>${nanoseconds}</journal-buffer-timeout>
|
<journal-buffer-timeout>${nanoseconds}</journal-buffer-timeout>
|
||||||
|
|
||||||
|
|
||||||
|
<!--
|
||||||
|
When using ASYNCIO, this will determine the writing queue depth for libaio.
|
||||||
|
-->
|
||||||
|
<journal-max-io>${maxaio}</journal-max-io>
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
|
||||||
<!--
|
<!--
|
||||||
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
|
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
|
||||||
<network-check-NIC>theNicName</network-check-NIC>
|
<network-check-NIC>theNicName</network-check-NIC>
|
||||||
|
|
|
@ -123,7 +123,7 @@ public class ArtemisTest extends CliTestBase {
|
||||||
public void testSync() throws Exception {
|
public void testSync() throws Exception {
|
||||||
int writes = 2;
|
int writes = 2;
|
||||||
int tries = 5;
|
int tries = 5;
|
||||||
long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, JournalType.NIO);
|
long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true, "file.tmp", 1, JournalType.NIO);
|
||||||
System.out.println();
|
System.out.println();
|
||||||
System.out.println("TotalAvg = " + totalAvg);
|
System.out.println("TotalAvg = " + totalAvg);
|
||||||
long nanoTime = SyncCalculation.toNanos(totalAvg, writes, false);
|
long nanoTime = SyncCalculation.toNanos(totalAvg, writes, false);
|
||||||
|
|
|
@ -103,7 +103,7 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
|
|
||||||
// These defaults are applied depending on whether the journal type
|
// These defaults are applied depending on whether the journal type
|
||||||
// is NIO or AIO.
|
// is NIO or AIO.
|
||||||
private static int DEFAULT_JOURNAL_MAX_IO_AIO = 500;
|
private static int DEFAULT_JOURNAL_MAX_IO_AIO = 4096;
|
||||||
private static int DEFAULT_JOURNAL_POOL_FILES = -1;
|
private static int DEFAULT_JOURNAL_POOL_FILES = -1;
|
||||||
private static int DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO;
|
private static int DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO;
|
||||||
private static int DEFAULT_JOURNAL_BUFFER_SIZE_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO;
|
private static int DEFAULT_JOURNAL_BUFFER_SIZE_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO;
|
||||||
|
|
|
@ -59,7 +59,7 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
|
||||||
|
|
||||||
protected volatile int alignment = -1;
|
protected volatile int alignment = -1;
|
||||||
|
|
||||||
private final IOCriticalErrorListener critialErrorListener;
|
protected final IOCriticalErrorListener critialErrorListener;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Asynchronous writes need to be done at another executor.
|
* Asynchronous writes need to be done at another executor.
|
||||||
|
@ -88,6 +88,11 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
|
||||||
this.maxIO = maxIO;
|
this.maxIO = maxIO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getBufferSize() {
|
||||||
|
return bufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getAlignment() {
|
public int getAlignment() {
|
||||||
if (alignment < 0) {
|
if (alignment < 0) {
|
||||||
|
|
|
@ -269,10 +269,6 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getBufferSize() {
|
|
||||||
return bufferSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The same callback is used for Runnable executor.
|
* The same callback is used for Runnable executor.
|
||||||
|
@ -416,6 +412,16 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
||||||
|
|
||||||
private boolean stopped = false;
|
private boolean stopped = false;
|
||||||
|
|
||||||
|
private int alignedBufferSize = 0;
|
||||||
|
|
||||||
|
private int getAlignedBufferSize() {
|
||||||
|
if (alignedBufferSize == 0) {
|
||||||
|
alignedBufferSize = calculateBlockSize(bufferSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
return alignedBufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
public ByteBuffer newBuffer(final int size) {
|
public ByteBuffer newBuffer(final int size) {
|
||||||
// if a new buffer wasn't requested in 10 seconds, we clear the queue
|
// if a new buffer wasn't requested in 10 seconds, we clear the queue
|
||||||
// This is being done this way as we don't need another Timeout Thread
|
// This is being done this way as we don't need another Timeout Thread
|
||||||
|
@ -433,26 +439,32 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
||||||
|
|
||||||
// if a buffer is bigger than the configured-bufferSize, we just create a new
|
// if a buffer is bigger than the configured-bufferSize, we just create a new
|
||||||
// buffer.
|
// buffer.
|
||||||
if (size > bufferSize) {
|
if (size > getAlignedBufferSize()) {
|
||||||
return LibaioContext.newAlignedBuffer(size, getAlignment());
|
return LibaioContext.newAlignedBuffer(size, getAlignment());
|
||||||
} else {
|
} else {
|
||||||
// We need to allocate buffers following the rules of the storage
|
// We need to allocate buffers following the rules of the storage
|
||||||
// being used (AIO/NIO)
|
// being used (AIO/NIO)
|
||||||
int alignedSize = calculateBlockSize(size);
|
final int alignedSize;
|
||||||
|
|
||||||
|
if (size < getAlignedBufferSize()) {
|
||||||
|
alignedSize = getAlignedBufferSize();
|
||||||
|
} else {
|
||||||
|
alignedSize = calculateBlockSize(size);
|
||||||
|
}
|
||||||
|
|
||||||
// Try getting a buffer from the queue...
|
// Try getting a buffer from the queue...
|
||||||
ByteBuffer buffer = reuseBuffersQueue.poll();
|
ByteBuffer buffer = reuseBuffersQueue.poll();
|
||||||
|
|
||||||
if (buffer == null) {
|
if (buffer == null) {
|
||||||
// if empty create a new one.
|
// if empty create a new one.
|
||||||
buffer = LibaioContext.newAlignedBuffer(size, getAlignment());
|
buffer = LibaioContext.newAlignedBuffer(alignedSize, getAlignment());
|
||||||
|
|
||||||
buffer.limit(alignedSize);
|
buffer.limit(calculateBlockSize(size));
|
||||||
} else {
|
} else {
|
||||||
clearBuffer(buffer);
|
clearBuffer(buffer);
|
||||||
|
|
||||||
// set the limit of the buffer to the bufferSize being required
|
// set the limit of the buffer to the bufferSize being required
|
||||||
buffer.limit(alignedSize);
|
buffer.limit(calculateBlockSize(size));
|
||||||
}
|
}
|
||||||
|
|
||||||
buffer.rewind();
|
buffer.rewind();
|
||||||
|
@ -484,7 +496,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
||||||
|
|
||||||
// If a buffer has any other than the configured bufferSize, the buffer
|
// If a buffer has any other than the configured bufferSize, the buffer
|
||||||
// will be just sent to GC
|
// will be just sent to GC
|
||||||
if (buffer.capacity() == bufferSize) {
|
if (buffer.capacity() == getAlignedBufferSize()) {
|
||||||
reuseBuffersQueue.offer(buffer);
|
reuseBuffersQueue.offer(buffer);
|
||||||
} else {
|
} else {
|
||||||
releaseBuffer(buffer);
|
releaseBuffer(buffer);
|
||||||
|
|
|
@ -17,48 +17,33 @@
|
||||||
package org.apache.activemq.artemis.core.io.mapped;
|
package org.apache.activemq.artemis.core.io.mapped;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FilenameFilter;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
|
||||||
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
|
|
||||||
import org.apache.activemq.artemis.utils.Env;
|
import org.apache.activemq.artemis.utils.Env;
|
||||||
|
|
||||||
public final class MappedSequentialFileFactory implements SequentialFileFactory {
|
public final class MappedSequentialFileFactory extends AbstractSequentialFileFactory {
|
||||||
|
|
||||||
private final File directory;
|
|
||||||
private int capacity;
|
private int capacity;
|
||||||
private final IOCriticalErrorListener criticalErrorListener;
|
|
||||||
private final TimedBuffer timedBuffer;
|
|
||||||
private boolean useDataSync;
|
|
||||||
private boolean bufferPooling;
|
private boolean bufferPooling;
|
||||||
//pools only the biggest one -> optimized for the common case
|
//pools only the biggest one -> optimized for the common case
|
||||||
private final ThreadLocal<ByteBuffer> bytesPool;
|
private final ThreadLocal<ByteBuffer> bytesPool;
|
||||||
private final int bufferSize;
|
|
||||||
|
|
||||||
private MappedSequentialFileFactory(File directory,
|
public MappedSequentialFileFactory(File directory,
|
||||||
int capacity,
|
int capacity,
|
||||||
final boolean buffered,
|
final boolean buffered,
|
||||||
final int bufferSize,
|
final int bufferSize,
|
||||||
final int bufferTimeout,
|
final int bufferTimeout,
|
||||||
IOCriticalErrorListener criticalErrorListener) {
|
IOCriticalErrorListener criticalErrorListener) {
|
||||||
this.directory = directory;
|
|
||||||
|
super(directory, buffered, bufferSize, bufferTimeout, 1, false, criticalErrorListener);
|
||||||
|
|
||||||
this.capacity = capacity;
|
this.capacity = capacity;
|
||||||
this.criticalErrorListener = criticalErrorListener;
|
this.setDatasync(true);
|
||||||
this.useDataSync = true;
|
|
||||||
if (buffered && bufferTimeout > 0 && bufferSize > 0) {
|
|
||||||
timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, false);
|
|
||||||
} else {
|
|
||||||
timedBuffer = null;
|
|
||||||
}
|
|
||||||
this.bufferSize = bufferSize;
|
|
||||||
this.bufferPooling = true;
|
this.bufferPooling = true;
|
||||||
this.bytesPool = new ThreadLocal<>();
|
this.bytesPool = new ThreadLocal<>();
|
||||||
}
|
}
|
||||||
|
@ -72,23 +57,9 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
||||||
return capacity;
|
return capacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static MappedSequentialFileFactory buffered(File directory,
|
|
||||||
int capacity,
|
|
||||||
final int bufferSize,
|
|
||||||
final int bufferTimeout,
|
|
||||||
IOCriticalErrorListener criticalErrorListener) {
|
|
||||||
return new MappedSequentialFileFactory(directory, capacity, true, bufferSize, bufferTimeout, criticalErrorListener);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static MappedSequentialFileFactory unbuffered(File directory,
|
|
||||||
int capacity,
|
|
||||||
IOCriticalErrorListener criticalErrorListener) {
|
|
||||||
return new MappedSequentialFileFactory(directory, capacity, false, 0, 0, criticalErrorListener);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SequentialFile createSequentialFile(String fileName) {
|
public SequentialFile createSequentialFile(String fileName) {
|
||||||
final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), capacity, criticalErrorListener);
|
final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, journalDir, new File(journalDir, fileName), capacity, critialErrorListener);
|
||||||
if (this.timedBuffer == null) {
|
if (this.timedBuffer == null) {
|
||||||
return mappedSequentialFile;
|
return mappedSequentialFile;
|
||||||
} else {
|
} else {
|
||||||
|
@ -96,49 +67,11 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public MappedSequentialFileFactory setDatasync(boolean enabled) {
|
|
||||||
this.useDataSync = enabled;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDatasync() {
|
|
||||||
return useDataSync;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getBufferSize() {
|
|
||||||
return bufferSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getMaxIO() {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<String> listFiles(final String extension) throws Exception {
|
|
||||||
final FilenameFilter extensionFilter = (file, name) -> name.endsWith("." + extension);
|
|
||||||
final String[] fileNames = directory.list(extensionFilter);
|
|
||||||
if (fileNames == null) {
|
|
||||||
return Collections.EMPTY_LIST;
|
|
||||||
}
|
|
||||||
return Arrays.asList(fileNames);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isSupportsCallbacks() {
|
public boolean isSupportsCallbacks() {
|
||||||
return timedBuffer != null;
|
return timedBuffer != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onIOError(Exception exception, String message, SequentialFile file) {
|
|
||||||
if (criticalErrorListener != null) {
|
|
||||||
criticalErrorListener.onIOException(exception, message, file);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ByteBuffer allocateDirectBuffer(final int size) {
|
public ByteBuffer allocateDirectBuffer(final int size) {
|
||||||
final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize());
|
final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize());
|
||||||
|
@ -204,20 +137,11 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void activateBuffer(SequentialFile file) {
|
public MappedSequentialFileFactory setDatasync(boolean enabled) {
|
||||||
if (timedBuffer != null) {
|
super.setDatasync(enabled);
|
||||||
file.setTimedBuffer(timedBuffer);
|
return this;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deactivateBuffer() {
|
|
||||||
if (timedBuffer != null) {
|
|
||||||
// When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer
|
|
||||||
timedBuffer.flush();
|
|
||||||
timedBuffer.setObserver(null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ByteBuffer wrapBuffer(final byte[] bytes) {
|
public ByteBuffer wrapBuffer(final byte[] bytes) {
|
||||||
|
@ -240,11 +164,6 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
||||||
return bytes;
|
return bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public File getDirectory() {
|
|
||||||
return this.directory;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clearBuffer(final ByteBuffer buffer) {
|
public void clearBuffer(final ByteBuffer buffer) {
|
||||||
if (buffer.isDirect()) {
|
if (buffer.isDirect()) {
|
||||||
|
@ -276,18 +195,4 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void createDirs() throws Exception {
|
|
||||||
boolean ok = directory.mkdirs();
|
|
||||||
if (!ok) {
|
|
||||||
throw new IOException("Failed to create directory " + directory);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void flush() {
|
|
||||||
if (timedBuffer != null) {
|
|
||||||
timedBuffer.flush();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -203,9 +203,4 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor
|
||||||
return bytes;
|
return bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getBufferSize() {
|
|
||||||
return bufferSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,7 @@ public class JournalTptBenchmark {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
|
||||||
case Mapped:
|
case Mapped:
|
||||||
factory = MappedSequentialFileFactory.buffered(tmpDirectory, fileSize, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null)
|
factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null)
|
||||||
.setDatasync(dataSync);
|
.setDatasync(dataSync);
|
||||||
break;
|
break;
|
||||||
case Nio:
|
case Nio:
|
||||||
|
|
|
@ -56,7 +56,7 @@ public class SequentialFileTptBenchmark {
|
||||||
|
|
||||||
case Mapped:
|
case Mapped:
|
||||||
final int fileSize = Math.max(msgSize * measurements, msgSize * warmup);
|
final int fileSize = Math.max(msgSize * measurements, msgSize * warmup);
|
||||||
factory = MappedSequentialFileFactory.buffered(tmpDirectory, fileSize, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync);
|
factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync);
|
||||||
break;
|
break;
|
||||||
case Nio:
|
case Nio:
|
||||||
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
|
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
|
||||||
|
|
|
@ -551,7 +551,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
|
|
||||||
config.setJournalFileSize(getTextBytesAsIntBytes(e, "journal-file-size", config.getJournalFileSize(), Validators.GT_ZERO));
|
config.setJournalFileSize(getTextBytesAsIntBytes(e, "journal-file-size", config.getJournalFileSize(), Validators.GT_ZERO));
|
||||||
|
|
||||||
int journalBufferTimeout = getInteger(e, "journal-buffer-timeout", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, Validators.GT_ZERO);
|
int journalBufferTimeout = getInteger(e, "journal-buffer-timeout", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, Validators.GE_ZERO);
|
||||||
|
|
||||||
int journalBufferSize = getTextBytesAsIntBytes(e, "journal-buffer-size", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, Validators.GT_ZERO);
|
int journalBufferSize = getTextBytesAsIntBytes(e, "journal-buffer-size", config.getJournalType() == JournalType.ASYNCIO ? ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO : ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, Validators.GT_ZERO);
|
||||||
|
|
||||||
|
|
|
@ -229,6 +229,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
public long getMaxRecordSize() {
|
public long getMaxRecordSize() {
|
||||||
return messageJournal.getMaxRecordSize();
|
return messageJournal.getMaxRecordSize();
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,7 +141,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
break;
|
break;
|
||||||
case MAPPED:
|
case MAPPED:
|
||||||
ActiveMQServerLogger.LOGGER.journalUseMAPPED();
|
ActiveMQServerLogger.LOGGER.journalUseMAPPED();
|
||||||
journalFF = MappedSequentialFileFactory.buffered(config.getJournalLocation(), config.getJournalFileSize(), config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), criticalErrorListener);
|
journalFF = new MappedSequentialFileFactory(config.getJournalLocation(), config.getJournalFileSize(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), criticalErrorListener);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
|
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
|
||||||
|
|
|
@ -74,7 +74,7 @@ Name | Description
|
||||||
[journal-compact-percentage](persistence.md) | The percentage of live data on which we consider compacting the journal. Default=30
|
[journal-compact-percentage](persistence.md) | The percentage of live data on which we consider compacting the journal. Default=30
|
||||||
[journal-directory](persistence.md) | the directory to store the journal files in. Default=data/journal
|
[journal-directory](persistence.md) | the directory to store the journal files in. Default=data/journal
|
||||||
[journal-file-size](persistence.md) | the size (in bytes) of each journal file. Default=10485760 (10 MB)
|
[journal-file-size](persistence.md) | the size (in bytes) of each journal file. Default=10485760 (10 MB)
|
||||||
[journal-max-io](persistence.md#configuring.message.journal.journal-max-io) | the maximum number of write requests that can be in the AIO queue at any one time. Default is 500 for AIO and 1 for NIO, ignored for MAPPED.
|
[journal-max-io](persistence.md#configuring.message.journal.journal-max-io) | the maximum number of write requests that can be in the AIO queue at any one time. Default is 4096 for AIO and 1 for NIO, ignored for MAPPED.
|
||||||
[journal-min-files](persistence.md#configuring.message.journal.journal-min-files) | how many journal files to pre-create. Default=2
|
[journal-min-files](persistence.md#configuring.message.journal.journal-min-files) | how many journal files to pre-create. Default=2
|
||||||
[journal-pool-files](persistence.md#configuring.message.journal.journal-pool-files) | The upper theshold of the journal file pool,-1 (default) means no Limit. The system will create as many files as needed however when reclaiming files it will shrink back to the `journal-pool-files`
|
[journal-pool-files](persistence.md#configuring.message.journal.journal-pool-files) | The upper theshold of the journal file pool,-1 (default) means no Limit. The system will create as many files as needed however when reclaiming files it will shrink back to the `journal-pool-files`
|
||||||
[journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true
|
[journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.journal;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.ArtemisConstants;
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||||
|
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A RealJournalImplTest
|
||||||
|
* you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
|
||||||
|
* If you are running this test in eclipse you should do:
|
||||||
|
* I - Run->Open Run Dialog
|
||||||
|
* II - Find the class on the list (you will find it if you already tried running this testcase before)
|
||||||
|
* III - Add -Djava.library.path=<your project place>/native/src/.libs
|
||||||
|
*/
|
||||||
|
public class AIOUnbuferedJournalImplTest extends JournalImplTestUnit {
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void hasAIO() {
|
||||||
|
org.junit.Assume.assumeTrue("Test case needs AIO to run", AIOSequentialFileFactory.isSupported());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
if (!LibaioContext.isLoaded()) {
|
||||||
|
Assert.fail(String.format("libAIO is not loaded on %s %s %s", System.getProperty("os.name"), System.getProperty("os.arch"), System.getProperty("os.version")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SequentialFileFactory getFileFactory() throws Exception {
|
||||||
|
File file = new File(getTestDir());
|
||||||
|
|
||||||
|
deleteDirectory(file);
|
||||||
|
|
||||||
|
file.mkdir();
|
||||||
|
|
||||||
|
// forcing the alignment to be 512, as this test was hard coded around this size.
|
||||||
|
return new AIOSequentialFileFactory(getTestDirfile(), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 0, 10, false).setAlignment(512);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getAlignment() {
|
||||||
|
return 512;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -42,7 +42,7 @@ public class MappedImportExportTest extends NIOImportExportTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected SequentialFileFactory getFileFactory() throws Exception {
|
protected SequentialFileFactory getFileFactory() throws Exception {
|
||||||
return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 10 * 4096, null);
|
return new MappedSequentialFileFactory(getTestDirfile(), 10 * 4096, false, 0, 0, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,6 @@ public class MappedJournalCompactTest extends NIOJournalCompactTest {
|
||||||
|
|
||||||
file.mkdir();
|
file.mkdir();
|
||||||
|
|
||||||
return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 60 * 1024, null);
|
return new MappedSequentialFileFactory(getTestDirfile(), 60 * 1024, false, 0, 0, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.journal;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.ArtemisConstants;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
|
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
|
||||||
|
@ -50,7 +51,7 @@ public class MappedJournalImplTest extends JournalImplTestUnit {
|
||||||
|
|
||||||
file.mkdir();
|
file.mkdir();
|
||||||
|
|
||||||
return MappedSequentialFileFactory.unbuffered(getTestDirfile(), 10 * 1024, null);
|
return new MappedSequentialFileFactory(getTestDirfile(), 10 * 1024, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -34,7 +34,7 @@ public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBa
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected SequentialFileFactory createFactory(String folder) {
|
protected SequentialFileFactory createFactory(String folder) {
|
||||||
return MappedSequentialFileFactory.unbuffered(new File(folder), 2048, null);
|
return new MappedSequentialFileFactory(new File(folder), 2048, false, 0, 0, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -58,7 +58,7 @@ public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBa
|
||||||
};
|
};
|
||||||
|
|
||||||
final AtomicInteger calls = new AtomicInteger(0);
|
final AtomicInteger calls = new AtomicInteger(0);
|
||||||
final MappedSequentialFileFactory factory = MappedSequentialFileFactory.unbuffered(new File(getTestDir()), fakeEncoding.getEncodeSize(), (code, message, file) -> {
|
final MappedSequentialFileFactory factory = new MappedSequentialFileFactory(new File(getTestDir()), fakeEncoding.getEncodeSize(), false, 0, 0, (code, message, file) -> {
|
||||||
new Exception("shutdown").printStackTrace();
|
new Exception("shutdown").printStackTrace();
|
||||||
calls.incrementAndGet();
|
calls.incrementAndGet();
|
||||||
});
|
});
|
||||||
|
|
|
@ -0,0 +1,61 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.journal;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
|
||||||
|
|
||||||
|
public class MappedUnbuferedJournalImplTest extends JournalImplTestUnit {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setup(int minFreeFiles, int fileSize, boolean sync) {
|
||||||
|
super.setup(minFreeFiles, fileSize, sync);
|
||||||
|
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setup(int minFreeFiles, int fileSize, boolean sync, int maxAIO) {
|
||||||
|
super.setup(minFreeFiles, fileSize, sync, maxAIO);
|
||||||
|
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setup(int minFreeFiles, int poolSize, int fileSize, boolean sync, int maxAIO) {
|
||||||
|
super.setup(minFreeFiles, poolSize, fileSize, sync, maxAIO);
|
||||||
|
((MappedSequentialFileFactory) this.fileFactory).capacity(fileSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SequentialFileFactory getFileFactory() throws Exception {
|
||||||
|
File file = new File(getTestDir());
|
||||||
|
|
||||||
|
deleteDirectory(file);
|
||||||
|
|
||||||
|
file.mkdir();
|
||||||
|
|
||||||
|
return new MappedSequentialFileFactory(getTestDirfile(), 10 * 1024, false, 0, 0, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getAlignment() {
|
||||||
|
return fileFactory.getAlignment();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -352,7 +352,7 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
|
||||||
} else if (factoryType.equals("nio2")) {
|
} else if (factoryType.equals("nio2")) {
|
||||||
return new NIOSequentialFileFactory(new File(directory), true, 1);
|
return new NIOSequentialFileFactory(new File(directory), true, 1);
|
||||||
} else if (factoryType.equals("mmap")) {
|
} else if (factoryType.equals("mmap")) {
|
||||||
return MappedSequentialFileFactory.unbuffered(new File(directory), fileSize, null);
|
return new MappedSequentialFileFactory(new File(directory), fileSize, false, 0, 0, null);
|
||||||
} else {
|
} else {
|
||||||
return new NIOSequentialFileFactory(new File(directory), false, 1);
|
return new NIOSequentialFileFactory(new File(directory), false, 1);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue