ARTEMIS-937 Implementing proper alignment and adding perf-journal tool to validate the journal syncs
This commit is contained in:
parent
57038ff47e
commit
ce035a8084
|
@ -50,7 +50,7 @@ import org.apache.activemq.artemis.cli.commands.tools.DecodeJournal;
|
||||||
import org.apache.activemq.artemis.cli.commands.tools.EncodeJournal;
|
import org.apache.activemq.artemis.cli.commands.tools.EncodeJournal;
|
||||||
import org.apache.activemq.artemis.cli.commands.tools.HelpData;
|
import org.apache.activemq.artemis.cli.commands.tools.HelpData;
|
||||||
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
|
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
|
||||||
import org.apache.activemq.artemis.cli.commands.tools.SyncRecalc;
|
import org.apache.activemq.artemis.cli.commands.tools.PerfJournal;
|
||||||
import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter;
|
import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter;
|
||||||
import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
|
import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
|
||||||
import org.apache.activemq.artemis.cli.commands.user.AddUser;
|
import org.apache.activemq.artemis.cli.commands.user.AddUser;
|
||||||
|
@ -163,7 +163,7 @@ public class Artemis {
|
||||||
withDefaultCommand(HelpData.class).withCommands(PrintData.class, XmlDataExporter.class, XmlDataImporter.class, DecodeJournal.class, EncodeJournal.class, CompactJournal.class);
|
withDefaultCommand(HelpData.class).withCommands(PrintData.class, XmlDataExporter.class, XmlDataImporter.class, DecodeJournal.class, EncodeJournal.class, CompactJournal.class);
|
||||||
builder.withGroup("user").withDescription("default file-based user management (add|rm|list|reset) (example ./artemis user list)").
|
builder.withGroup("user").withDescription("default file-based user management (add|rm|list|reset) (example ./artemis user list)").
|
||||||
withDefaultCommand(HelpUser.class).withCommands(ListUser.class, AddUser.class, RemoveUser.class, ResetUser.class);
|
withDefaultCommand(HelpUser.class).withCommands(ListUser.class, AddUser.class, RemoveUser.class, ResetUser.class);
|
||||||
builder = builder.withCommands(Run.class, Stop.class, Kill.class, SyncRecalc.class);
|
builder = builder.withCommands(Run.class, Stop.class, Kill.class, PerfJournal.class);
|
||||||
} else {
|
} else {
|
||||||
builder.withGroup("data").withDescription("data tools group (print) (example ./artemis data print)").
|
builder.withGroup("data").withDescription("data tools group (print) (example ./artemis data print)").
|
||||||
withDefaultCommand(HelpData.class).withCommands(PrintData.class);
|
withDefaultCommand(HelpData.class).withCommands(PrintData.class);
|
||||||
|
|
|
@ -40,6 +40,7 @@ import io.airlift.airline.Option;
|
||||||
import org.apache.activemq.artemis.cli.CLIException;
|
import org.apache.activemq.artemis.cli.CLIException;
|
||||||
import org.apache.activemq.artemis.cli.commands.util.HashUtil;
|
import org.apache.activemq.artemis.cli.commands.util.HashUtil;
|
||||||
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
|
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
|
||||||
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||||
import org.apache.activemq.artemis.jlibaio.LibaioFile;
|
import org.apache.activemq.artemis.jlibaio.LibaioFile;
|
||||||
|
@ -209,11 +210,14 @@ public class Create extends InputAbstract {
|
||||||
@Option(name = "--addresses", description = "comma separated list of addresses ")
|
@Option(name = "--addresses", description = "comma separated list of addresses ")
|
||||||
String addresses;
|
String addresses;
|
||||||
|
|
||||||
@Option(name = "--aio", description = "Force aio journal on the configuration regardless of the library being available or not.")
|
@Option(name = "--aio", description = "sets the journal as asyncio.")
|
||||||
boolean forceLibaio;
|
boolean aio;
|
||||||
|
|
||||||
@Option(name = "--nio", description = "Force nio journal on the configuration regardless of the library being available or not.")
|
@Option(name = "--nio", description = "sets the journal as nio.")
|
||||||
boolean forceNIO;
|
boolean nio;
|
||||||
|
|
||||||
|
// this is used by the setupJournalType method
|
||||||
|
private JournalType journalType;
|
||||||
|
|
||||||
@Option(name = "--disable-persistence", description = "Disable message persistence to the journal")
|
@Option(name = "--disable-persistence", description = "Disable message persistence to the journal")
|
||||||
boolean disablePersistence;
|
boolean disablePersistence;
|
||||||
|
@ -558,15 +562,13 @@ public class Create extends InputAbstract {
|
||||||
throw new RuntimeException(String.format("The path '%s' is not writable.", directory));
|
throw new RuntimeException(String.format("The path '%s' is not writable.", directory));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object run(ActionContext context) throws Exception {
|
public Object run(ActionContext context) throws Exception {
|
||||||
if (forceLibaio && forceNIO) {
|
|
||||||
throw new RuntimeException("You can't specify --nio and --aio in the same execution.");
|
|
||||||
}
|
|
||||||
|
|
||||||
IS_WINDOWS = System.getProperty("os.name").toLowerCase().trim().startsWith("win");
|
IS_WINDOWS = System.getProperty("os.name").toLowerCase().trim().startsWith("win");
|
||||||
IS_CYGWIN = IS_WINDOWS && "cygwin".equals(System.getenv("OSTYPE"));
|
IS_CYGWIN = IS_WINDOWS && "cygwin".equals(System.getenv("OSTYPE"));
|
||||||
|
|
||||||
|
setupJournalType();
|
||||||
|
|
||||||
// requireLogin should set alloAnonymous=false, to avoid user's questions
|
// requireLogin should set alloAnonymous=false, to avoid user's questions
|
||||||
if (requireLogin != null && requireLogin.booleanValue()) {
|
if (requireLogin != null && requireLogin.booleanValue()) {
|
||||||
allowAnonymous = Boolean.FALSE;
|
allowAnonymous = Boolean.FALSE;
|
||||||
|
@ -603,15 +605,7 @@ public class Create extends InputAbstract {
|
||||||
filters.put("${shared-store.settings}", "");
|
filters.put("${shared-store.settings}", "");
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean aio;
|
filters.put("${journal.settings}", journalType.name());
|
||||||
|
|
||||||
if (IS_WINDOWS || !supportsLibaio()) {
|
|
||||||
aio = false;
|
|
||||||
filters.put("${journal.settings}", "NIO");
|
|
||||||
} else {
|
|
||||||
aio = true;
|
|
||||||
filters.put("${journal.settings}", "ASYNCIO");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sslKey != null) {
|
if (sslKey != null) {
|
||||||
filters.put("${web.protocol}", "https");
|
filters.put("${web.protocol}", "https");
|
||||||
|
@ -761,7 +755,7 @@ public class Create extends InputAbstract {
|
||||||
|
|
||||||
filters.put("${auto-create}", isAutoCreate() ? "true" : "false");
|
filters.put("${auto-create}", isAutoCreate() ? "true" : "false");
|
||||||
|
|
||||||
performAutoTune(filters, aio, dataFolder);
|
performAutoTune(filters, journalType, dataFolder);
|
||||||
|
|
||||||
write(ETC_BROKER_XML, filters, false);
|
write(ETC_BROKER_XML, filters, false);
|
||||||
write(ETC_ARTEMIS_USERS_PROPERTIES, filters, false);
|
write(ETC_ARTEMIS_USERS_PROPERTIES, filters, false);
|
||||||
|
@ -802,6 +796,38 @@ public class Create extends InputAbstract {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setupJournalType() {
|
||||||
|
int countJournalTypes = countBoolean(aio, nio);
|
||||||
|
if (countJournalTypes > 1) {
|
||||||
|
throw new RuntimeException("You can only select one journal type (--nio | --aio | --mapped).");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (countJournalTypes == 0) {
|
||||||
|
if (supportsLibaio()) {
|
||||||
|
aio = true;
|
||||||
|
} else {
|
||||||
|
nio = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aio) {
|
||||||
|
journalType = JournalType.ASYNCIO;
|
||||||
|
} else if (nio) {
|
||||||
|
journalType = JournalType.NIO;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static int countBoolean(boolean...b) {
|
||||||
|
int count = 0;
|
||||||
|
|
||||||
|
for (boolean itemB : b) {
|
||||||
|
if (itemB) count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
private String getLogManager() throws IOException {
|
private String getLogManager() throws IOException {
|
||||||
String logManager = "";
|
String logManager = "";
|
||||||
File dir = new File(path(getHome().toString(), false) + "/lib");
|
File dir = new File(path(getHome().toString(), false) + "/lib");
|
||||||
|
@ -858,7 +884,7 @@ public class Create extends InputAbstract {
|
||||||
filters.put("${address-queue.settings}", writer.toString());
|
filters.put("${address-queue.settings}", writer.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void performAutoTune(HashMap<String, String> filters, boolean aio, File dataFolder) {
|
private void performAutoTune(HashMap<String, String> filters, JournalType journalType, File dataFolder) {
|
||||||
if (noAutoTune) {
|
if (noAutoTune) {
|
||||||
filters.put("${journal-buffer.settings}", "");
|
filters.put("${journal-buffer.settings}", "");
|
||||||
} else {
|
} else {
|
||||||
|
@ -867,7 +893,7 @@ public class Create extends InputAbstract {
|
||||||
System.out.println("");
|
System.out.println("");
|
||||||
System.out.println("Auto tuning journal ...");
|
System.out.println("Auto tuning journal ...");
|
||||||
|
|
||||||
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, aio);
|
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, journalType);
|
||||||
long nanoseconds = SyncCalculation.toNanos(time, writes, verbose);
|
long nanoseconds = SyncCalculation.toNanos(time, writes, verbose);
|
||||||
double writesPerMillisecond = (double) writes / (double) time;
|
double writesPerMillisecond = (double) writes / (double) time;
|
||||||
|
|
||||||
|
@ -891,13 +917,10 @@ public class Create extends InputAbstract {
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean supportsLibaio() {
|
public boolean supportsLibaio() {
|
||||||
if (forceLibaio) {
|
if (IS_WINDOWS) {
|
||||||
// forcing libaio
|
|
||||||
return true;
|
|
||||||
} else if (forceNIO) {
|
|
||||||
// forcing NIO
|
|
||||||
return false;
|
return false;
|
||||||
} else if (LibaioContext.isLoaded()) {
|
}
|
||||||
|
if (LibaioContext.isLoaded()) {
|
||||||
try (LibaioContext context = new LibaioContext(1, true, true)) {
|
try (LibaioContext context = new LibaioContext(1, true, true)) {
|
||||||
File tmpFile = new File(directory, "validateAIO.bin");
|
File tmpFile = new File(directory, "validateAIO.bin");
|
||||||
boolean supportsLibaio = true;
|
boolean supportsLibaio = true;
|
||||||
|
|
|
@ -20,13 +20,35 @@ package org.apache.activemq.artemis.cli.commands.tools;
|
||||||
import java.text.DecimalFormat;
|
import java.text.DecimalFormat;
|
||||||
|
|
||||||
import io.airlift.airline.Command;
|
import io.airlift.airline.Command;
|
||||||
|
import io.airlift.airline.Option;
|
||||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||||
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
|
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
|
||||||
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
|
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
|
|
||||||
@Command(name = "sync", description = "Calculates the journal-buffer-timeout you should use with the current data folder")
|
@Command(name = "perf-journal", description = "Calculates the journal-buffer-timeout you should use with the current data folder")
|
||||||
public class SyncRecalc extends LockAbstract {
|
public class PerfJournal extends LockAbstract {
|
||||||
|
|
||||||
|
|
||||||
|
@Option(name = "--block-size", description = "The block size for each write (default 4096)")
|
||||||
|
public int size = 4 * 1024;
|
||||||
|
|
||||||
|
|
||||||
|
@Option(name = "--writes", description = "The number of writes to be performed (default 250)")
|
||||||
|
public int writes = 250;
|
||||||
|
|
||||||
|
@Option(name = "--tries", description = "The number of tries for the test (default 5)")
|
||||||
|
public int tries = 5;
|
||||||
|
|
||||||
|
@Option(name = "--no-sync", description = "Disable sync")
|
||||||
|
public boolean nosyncs = false;
|
||||||
|
|
||||||
|
@Option(name = "--sync", description = "Enable syncs")
|
||||||
|
public boolean syncs = false;
|
||||||
|
|
||||||
|
@Option(name = "--journal-type", description = "Journal Type to be used (default from broker.xml)")
|
||||||
|
public String journalType = null;
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object execute(ActionContext context) throws Exception {
|
public Object execute(ActionContext context) throws Exception {
|
||||||
|
@ -34,11 +56,26 @@ public class SyncRecalc extends LockAbstract {
|
||||||
|
|
||||||
FileConfiguration fileConfiguration = getFileConfiguration();
|
FileConfiguration fileConfiguration = getFileConfiguration();
|
||||||
|
|
||||||
int writes = 250;
|
if (nosyncs) {
|
||||||
|
fileConfiguration.setJournalDatasync(false);
|
||||||
|
} else if (syncs) {
|
||||||
|
fileConfiguration.setJournalDatasync(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (journalType != null) {
|
||||||
|
fileConfiguration.setJournalType(JournalType.getType(journalType));
|
||||||
|
}
|
||||||
|
|
||||||
System.out.println("");
|
System.out.println("");
|
||||||
System.out.println("Auto tuning journal ...");
|
System.out.println("Auto tuning journal ...");
|
||||||
|
|
||||||
long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), 4096, writes, 5, verbose, fileConfiguration.isJournalDatasync(), fileConfiguration.getJournalType() == JournalType.ASYNCIO);
|
System.out.println("Performing " + tries + " tests writing " + writes + " blocks of " + size + " on each test, sync=" + fileConfiguration.isJournalDatasync() + " with journalType = " + fileConfiguration.getJournalType());
|
||||||
|
|
||||||
|
fileConfiguration.getJournalLocation().mkdirs();
|
||||||
|
|
||||||
|
long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), fileConfiguration.getJournalType());
|
||||||
|
|
||||||
long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose);
|
long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose);
|
||||||
double writesPerMillisecond = (double) writes / (double) time;
|
double writesPerMillisecond = (double) writes / (double) time;
|
||||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
|
||||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
|
|
||||||
|
@ -47,8 +49,8 @@ public class SyncCalculation {
|
||||||
int tries,
|
int tries,
|
||||||
boolean verbose,
|
boolean verbose,
|
||||||
boolean fsync,
|
boolean fsync,
|
||||||
boolean aio) throws Exception {
|
JournalType journalType) throws Exception {
|
||||||
SequentialFileFactory factory = newFactory(datafolder, fsync, aio);
|
SequentialFileFactory factory = newFactory(datafolder, fsync, journalType, blockSize * blocks);
|
||||||
|
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
System.out.println("Using " + factory.getClass().getName() + " to calculate sync times");
|
System.out.println("Using " + factory.getClass().getName() + " to calculate sync times");
|
||||||
|
@ -61,6 +63,8 @@ public class SyncCalculation {
|
||||||
|
|
||||||
file.fill(blockSize * blocks);
|
file.fill(blockSize * blocks);
|
||||||
|
|
||||||
|
file.close();
|
||||||
|
|
||||||
long[] result = new long[tries];
|
long[] result = new long[tries];
|
||||||
|
|
||||||
byte[] block = new byte[blockSize];
|
byte[] block = new byte[blockSize];
|
||||||
|
@ -94,6 +98,7 @@ public class SyncCalculation {
|
||||||
System.out.println("**************************************************");
|
System.out.println("**************************************************");
|
||||||
System.out.println(ntry + " of " + tries + " calculation");
|
System.out.println(ntry + " of " + tries + " calculation");
|
||||||
}
|
}
|
||||||
|
file.open();
|
||||||
file.position(0);
|
file.position(0);
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
for (int i = 0; i < blocks; i++) {
|
for (int i = 0; i < blocks; i++) {
|
||||||
|
@ -115,6 +120,7 @@ public class SyncCalculation {
|
||||||
System.out.println("bufferTimeout = " + toNanos(result[ntry], blocks, verbose));
|
System.out.println("bufferTimeout = " + toNanos(result[ntry], blocks, verbose));
|
||||||
System.out.println("**************************************************");
|
System.out.println("**************************************************");
|
||||||
}
|
}
|
||||||
|
file.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
factory.releaseDirectBuffer(bufferBlock);
|
factory.releaseDirectBuffer(bufferBlock);
|
||||||
|
@ -162,17 +168,26 @@ public class SyncCalculation {
|
||||||
return timeWait;
|
return timeWait;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static SequentialFileFactory newFactory(File datafolder, boolean datasync, boolean aio) {
|
private static SequentialFileFactory newFactory(File datafolder, boolean datasync, JournalType journalType, int fileSize) {
|
||||||
if (aio && LibaioContext.isLoaded()) {
|
SequentialFileFactory factory;
|
||||||
SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
|
|
||||||
factory.start();
|
|
||||||
((AIOSequentialFileFactory) factory).disableBufferReuse();
|
|
||||||
|
|
||||||
return factory;
|
if (journalType == JournalType.ASYNCIO && !LibaioContext.isLoaded()) {
|
||||||
} else {
|
journalType = JournalType.NIO;
|
||||||
SequentialFileFactory factory = new NIOSequentialFileFactory(datafolder, 1);
|
}
|
||||||
factory.start();
|
|
||||||
return factory;
|
switch (journalType) {
|
||||||
|
|
||||||
|
case NIO:
|
||||||
|
factory = new NIOSequentialFileFactory(datafolder, 1);
|
||||||
|
factory.start();
|
||||||
|
return factory;
|
||||||
|
case ASYNCIO:
|
||||||
|
factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
|
||||||
|
factory.start();
|
||||||
|
((AIOSequentialFileFactory) factory).disableBufferReuse();
|
||||||
|
return factory;
|
||||||
|
default:
|
||||||
|
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(journalType);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
|
||||||
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
|
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
|
||||||
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
|
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
|
@ -114,7 +115,7 @@ public class ArtemisTest extends CliTestBase {
|
||||||
public void testSync() throws Exception {
|
public void testSync() throws Exception {
|
||||||
int writes = 20;
|
int writes = 20;
|
||||||
int tries = 10;
|
int tries = 10;
|
||||||
long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true);
|
long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, JournalType.NIO);
|
||||||
System.out.println();
|
System.out.println();
|
||||||
System.out.println("TotalAvg = " + totalAvg);
|
System.out.println("TotalAvg = " + totalAvg);
|
||||||
long nanoTime = SyncCalculation.toNanos(totalAvg, writes, false);
|
long nanoTime = SyncCalculation.toNanos(totalAvg, writes, false);
|
||||||
|
@ -130,6 +131,19 @@ public class ArtemisTest extends CliTestBase {
|
||||||
Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--no-fsync");
|
Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--no-fsync");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleCreateMapped() throws Throwable {
|
||||||
|
try {
|
||||||
|
//instance1: default using http
|
||||||
|
File instance1 = new File(temporaryFolder.getRoot(), "instance1");
|
||||||
|
Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--mapped");
|
||||||
|
} catch (Throwable e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWebConfig() throws Exception {
|
public void testWebConfig() throws Exception {
|
||||||
setupAuth();
|
setupAuth();
|
||||||
|
@ -500,6 +514,20 @@ public class ArtemisTest extends CliTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPerfJournal() throws Exception {
|
||||||
|
File instanceFolder = temporaryFolder.newFolder("server1");
|
||||||
|
setupAuth(instanceFolder);
|
||||||
|
|
||||||
|
Run.setEmbedded(true);
|
||||||
|
Artemis.main("create", instanceFolder.getAbsolutePath(), "--force", "--silent", "--no-web", "--no-autotune", "--require-login");
|
||||||
|
System.setProperty("artemis.instance", instanceFolder.getAbsolutePath());
|
||||||
|
|
||||||
|
Artemis.main("perf-journal", "--journal-type", "NIO", "--writes", "5000", "--tries", "50", "--verbose");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public void testSimpleRun(String folderName) throws Exception {
|
public void testSimpleRun(String folderName) throws Exception {
|
||||||
File instanceFolder = temporaryFolder.newFolder(folderName);
|
File instanceFolder = temporaryFolder.newFolder(folderName);
|
||||||
|
|
||||||
|
|
|
@ -118,11 +118,6 @@ public class JDBCSequentialFile implements SequentialFile {
|
||||||
return writePosition + size <= dbDriver.getMaxSize();
|
return writePosition + size <= dbDriver.getMaxSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getAlignment() throws Exception {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int calculateBlockStart(int position) throws Exception {
|
public int calculateBlockStart(int position) throws Exception {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -176,6 +176,12 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JDBCSequentialFileFactory setAlignment(int alignment) {
|
||||||
|
// no op
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int calculateBlockSize(final int bytes) {
|
public int calculateBlockSize(final int bytes) {
|
||||||
return bytes;
|
return bytes;
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.jms.persistence.impl.journal;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -78,8 +79,9 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
|
||||||
final IDGenerator idGenerator,
|
final IDGenerator idGenerator,
|
||||||
final Configuration config,
|
final Configuration config,
|
||||||
final ReplicationManager replicator) {
|
final ReplicationManager replicator) {
|
||||||
if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO) {
|
final EnumSet<JournalType> supportedJournalTypes = EnumSet.allOf(JournalType.class);
|
||||||
throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals");
|
if (!supportedJournalTypes.contains(config.getJournalType())) {
|
||||||
|
throw new IllegalArgumentException("Only " + supportedJournalTypes + " are supported Journal types");
|
||||||
}
|
}
|
||||||
|
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
|
|
@ -54,6 +54,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
|
||||||
|
|
||||||
protected boolean dataSync = true;
|
protected boolean dataSync = true;
|
||||||
|
|
||||||
|
protected volatile int alignment = -1;
|
||||||
|
|
||||||
private final IOCriticalErrorListener critialErrorListener;
|
private final IOCriticalErrorListener critialErrorListener;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -83,6 +85,20 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
|
||||||
this.maxIO = maxIO;
|
this.maxIO = maxIO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getAlignment() {
|
||||||
|
if (alignment < 0) {
|
||||||
|
alignment = 1;
|
||||||
|
}
|
||||||
|
return alignment;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AbstractSequentialFileFactory setAlignment(int alignment) {
|
||||||
|
this.alignment = alignment;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SequentialFileFactory setDatasync(boolean enabled) {
|
public SequentialFileFactory setDatasync(boolean enabled) {
|
||||||
|
|
|
@ -43,8 +43,6 @@ public interface SequentialFile {
|
||||||
|
|
||||||
boolean fits(int size);
|
boolean fits(int size);
|
||||||
|
|
||||||
int getAlignment() throws Exception;
|
|
||||||
|
|
||||||
int calculateBlockStart(int position) throws Exception;
|
int calculateBlockStart(int position) throws Exception;
|
||||||
|
|
||||||
String getFileName();
|
String getFileName();
|
||||||
|
|
|
@ -79,6 +79,8 @@ public interface SequentialFileFactory {
|
||||||
|
|
||||||
int getAlignment();
|
int getAlignment();
|
||||||
|
|
||||||
|
SequentialFileFactory setAlignment(int alignment);
|
||||||
|
|
||||||
int calculateBlockSize(int bytes);
|
int calculateBlockSize(int bytes);
|
||||||
|
|
||||||
File getDirectory();
|
File getDirectory();
|
||||||
|
|
|
@ -78,17 +78,9 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
||||||
return opened;
|
return opened;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getAlignment() {
|
|
||||||
// TODO: get the alignment from the file system, but we have to cache this, we can't call it every time
|
|
||||||
/* checkOpened();
|
|
||||||
return aioFile.getBlockSize(); */
|
|
||||||
return 512;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int calculateBlockStart(final int position) {
|
public int calculateBlockStart(final int position) {
|
||||||
int alignment = getAlignment();
|
int alignment = factory.getAlignment();
|
||||||
|
|
||||||
int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
|
int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
|
||||||
|
|
||||||
|
|
|
@ -143,13 +143,13 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
||||||
@Override
|
@Override
|
||||||
public ByteBuffer allocateDirectBuffer(final int size) {
|
public ByteBuffer allocateDirectBuffer(final int size) {
|
||||||
|
|
||||||
int blocks = size / 512;
|
int blocks = size / getAlignment();
|
||||||
if (size % 512 != 0) {
|
if (size % getAlignment() != 0) {
|
||||||
blocks++;
|
blocks++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// The buffer on AIO has to be a multiple of 512
|
// The buffer on AIO has to be a multiple of getAlignment()
|
||||||
ByteBuffer buffer = LibaioContext.newAlignedBuffer(blocks * 512, 512);
|
ByteBuffer buffer = LibaioContext.newAlignedBuffer(blocks * getAlignment(), getAlignment());
|
||||||
|
|
||||||
buffer.limit(size);
|
buffer.limit(size);
|
||||||
|
|
||||||
|
@ -163,8 +163,8 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ByteBuffer newBuffer(int size) {
|
public ByteBuffer newBuffer(int size) {
|
||||||
if (size % 512 != 0) {
|
if (size % getAlignment() != 0) {
|
||||||
size = (size / 512 + 1) * 512;
|
size = (size / getAlignment() + 1) * getAlignment();
|
||||||
}
|
}
|
||||||
|
|
||||||
return buffersControl.newBuffer(size);
|
return buffersControl.newBuffer(size);
|
||||||
|
@ -178,7 +178,26 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getAlignment() {
|
public int getAlignment() {
|
||||||
return 512;
|
if (alignment < 0) {
|
||||||
|
|
||||||
|
File checkFile = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
journalDir.mkdirs();
|
||||||
|
checkFile = File.createTempFile("journalCheck", ".tmp", journalDir);
|
||||||
|
checkFile.mkdirs();
|
||||||
|
checkFile.createNewFile();
|
||||||
|
alignment = LibaioContext.getBlockSize(checkFile);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
logger.warn(e.getMessage(), e);
|
||||||
|
alignment = 512;
|
||||||
|
} finally {
|
||||||
|
if (checkFile != null) {
|
||||||
|
checkFile.delete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return alignment;
|
||||||
}
|
}
|
||||||
|
|
||||||
// For tests only
|
// For tests only
|
||||||
|
@ -399,7 +418,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
||||||
// if a buffer is bigger than the configured-bufferSize, we just create a new
|
// if a buffer is bigger than the configured-bufferSize, we just create a new
|
||||||
// buffer.
|
// buffer.
|
||||||
if (size > bufferSize) {
|
if (size > bufferSize) {
|
||||||
return LibaioContext.newAlignedBuffer(size, 512);
|
return LibaioContext.newAlignedBuffer(size, getAlignment());
|
||||||
} else {
|
} else {
|
||||||
// We need to allocate buffers following the rules of the storage
|
// We need to allocate buffers following the rules of the storage
|
||||||
// being used (AIO/NIO)
|
// being used (AIO/NIO)
|
||||||
|
@ -410,7 +429,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
||||||
|
|
||||||
if (buffer == null) {
|
if (buffer == null) {
|
||||||
// if empty create a new one.
|
// if empty create a new one.
|
||||||
buffer = LibaioContext.newAlignedBuffer(size, 512);
|
buffer = LibaioContext.newAlignedBuffer(size, getAlignment());
|
||||||
|
|
||||||
buffer.limit(alignedSize);
|
buffer.limit(alignedSize);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -113,11 +113,6 @@ final class MappedSequentialFile implements SequentialFile {
|
||||||
return hasRemaining;
|
return hasRemaining;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getAlignment() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int calculateBlockStart(int position) {
|
public int calculateBlockStart(int position) {
|
||||||
return position;
|
return position;
|
||||||
|
|
|
@ -161,6 +161,12 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SequentialFileFactory setAlignment(int alignment) {
|
||||||
|
// no op
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int calculateBlockSize(int bytes) {
|
public int calculateBlockSize(int bytes) {
|
||||||
return bytes;
|
return bytes;
|
||||||
|
|
|
@ -53,11 +53,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
||||||
defaultMaxIO = maxIO;
|
defaultMaxIO = maxIO;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getAlignment() {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int calculateBlockStart(final int position) {
|
public int calculateBlockStart(final int position) {
|
||||||
return position;
|
return position;
|
||||||
|
|
|
@ -39,9 +39,7 @@ public final class LibaioFile<Callback extends SubmitInfo> implements AutoClosea
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getBlockSize() {
|
public int getBlockSize() {
|
||||||
return 512;
|
return LibaioContext.getBlockSizeFD(fd);
|
||||||
// FIXME
|
|
||||||
//return LibaioContext.getBlockSizeFD(fd);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean lock() {
|
public boolean lock() {
|
||||||
|
|
|
@ -563,7 +563,7 @@ public interface Configuration {
|
||||||
Configuration setJournalDirectory(String dir);
|
Configuration setJournalDirectory(String dir);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the type of journal used by this server (either {@code NIO} or {@code ASYNCIO}).
|
* Returns the type of journal used by this server ({@code NIO}, {@code ASYNCIO} or {@code MAPPED}).
|
||||||
* <br>
|
* <br>
|
||||||
* Default value is ASYNCIO.
|
* Default value is ASYNCIO.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.config.impl;
|
package org.apache.activemq.artemis.core.config.impl;
|
||||||
|
|
||||||
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
||||||
import org.apache.activemq.artemis.core.server.JournalType;
|
import org.apache.activemq.artemis.core.server.JournalType;
|
||||||
|
@ -125,7 +127,7 @@ public final class Validators {
|
||||||
@Override
|
@Override
|
||||||
public void validate(final String name, final Object value) {
|
public void validate(final String name, final Object value) {
|
||||||
String val = (String) value;
|
String val = (String) value;
|
||||||
if (val == null || !val.equals(JournalType.NIO.toString()) && !val.equals(JournalType.ASYNCIO.toString())) {
|
if (val == null || !EnumSet.allOf(JournalType.class).contains(JournalType.valueOf(val))) {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType(val);
|
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType(val);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -497,22 +497,19 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
|
|
||||||
String s = getString(e, "journal-type", config.getJournalType().toString(), Validators.JOURNAL_TYPE);
|
String s = getString(e, "journal-type", config.getJournalType().toString(), Validators.JOURNAL_TYPE);
|
||||||
|
|
||||||
if (s.equals(JournalType.NIO.toString())) {
|
config.setJournalType(JournalType.getType(s));
|
||||||
config.setJournalType(JournalType.NIO);
|
|
||||||
} else if (s.equals(JournalType.ASYNCIO.toString())) {
|
if (config.getJournalType() == JournalType.ASYNCIO) {
|
||||||
// https://jira.jboss.org/jira/browse/HORNETQ-295
|
// https://jira.jboss.org/jira/browse/HORNETQ-295
|
||||||
// We do the check here to see if AIO is supported so we can use the correct defaults and/or use
|
// We do the check here to see if AIO is supported so we can use the correct defaults and/or use
|
||||||
// correct settings in xml
|
// correct settings in xml
|
||||||
// If we fall back later on these settings can be ignored
|
// If we fall back later on these settings can be ignored
|
||||||
boolean supportsAIO = AIOSequentialFileFactory.isSupported();
|
boolean supportsAIO = AIOSequentialFileFactory.isSupported();
|
||||||
|
|
||||||
if (supportsAIO) {
|
if (!supportsAIO) {
|
||||||
config.setJournalType(JournalType.ASYNCIO);
|
|
||||||
} else {
|
|
||||||
if (validateAIO) {
|
if (validateAIO) {
|
||||||
ActiveMQServerLogger.LOGGER.AIONotFound();
|
ActiveMQServerLogger.LOGGER.AIONotFound();
|
||||||
}
|
}
|
||||||
|
|
||||||
config.setJournalType(JournalType.NIO);
|
config.setJournalType(JournalType.NIO);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.File;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -113,7 +114,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
@Override
|
@Override
|
||||||
protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
|
protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
|
||||||
|
|
||||||
if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO) {
|
if (!EnumSet.allOf(JournalType.class).contains(config.getJournalType())) {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.invalidJournal();
|
throw ActiveMQMessageBundle.BUNDLE.invalidJournal();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,21 +126,23 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
bindingsJournal = localBindings;
|
bindingsJournal = localBindings;
|
||||||
originalBindingsJournal = localBindings;
|
originalBindingsJournal = localBindings;
|
||||||
|
|
||||||
if (config.getJournalType() == JournalType.ASYNCIO) {
|
switch (config.getJournalType()) {
|
||||||
ActiveMQServerLogger.LOGGER.journalUseAIO();
|
|
||||||
|
|
||||||
journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener);
|
case NIO:
|
||||||
} else if (config.getJournalType() == JournalType.NIO) {
|
ActiveMQServerLogger.LOGGER.journalUseNIO();
|
||||||
ActiveMQServerLogger.LOGGER.journalUseNIO();
|
journalFF = new NIOSequentialFileFactory(config.getJournalLocation(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener);
|
||||||
journalFF = new NIOSequentialFileFactory(config.getJournalLocation(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener);
|
break;
|
||||||
} else {
|
case ASYNCIO:
|
||||||
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
|
ActiveMQServerLogger.LOGGER.journalUseAIO();
|
||||||
|
journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
|
||||||
}
|
}
|
||||||
|
|
||||||
journalFF.setDatasync(config.isJournalDatasync());
|
journalFF.setDatasync(config.isJournalDatasync());
|
||||||
|
|
||||||
|
Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0);
|
||||||
Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO(), 0);
|
|
||||||
|
|
||||||
messageJournal = localMessage;
|
messageJournal = localMessage;
|
||||||
originalMessageJournal = localMessage;
|
originalMessageJournal = localMessage;
|
||||||
|
|
|
@ -16,6 +16,32 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.server;
|
package org.apache.activemq.artemis.core.server;
|
||||||
|
|
||||||
|
|
||||||
public enum JournalType {
|
public enum JournalType {
|
||||||
NIO, ASYNCIO;
|
NIO, ASYNCIO, MAPPED;
|
||||||
|
|
||||||
|
public static final String validValues;
|
||||||
|
|
||||||
|
static {
|
||||||
|
StringBuffer stringBuffer = new StringBuffer();
|
||||||
|
for (JournalType type : JournalType.values()) {
|
||||||
|
|
||||||
|
if (stringBuffer.length() != 0) {
|
||||||
|
stringBuffer.append(",");
|
||||||
|
}
|
||||||
|
|
||||||
|
stringBuffer.append(type.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
validValues = stringBuffer.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static JournalType getType(String type) {
|
||||||
|
switch (type) {
|
||||||
|
case "NIO": return NIO;
|
||||||
|
case "ASYNCIO" : return ASYNCIO;
|
||||||
|
default: throw new IllegalStateException("Invalid JournalType:" + type + " valid Types: " + validValues);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,7 +44,7 @@ public class AIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase
|
||||||
SequentialFile file = factory.createSequentialFile("filtetmp.log");
|
SequentialFile file = factory.createSequentialFile("filtetmp.log");
|
||||||
file.open();
|
file.open();
|
||||||
ByteBuffer buff = factory.newBuffer(10);
|
ByteBuffer buff = factory.newBuffer(10);
|
||||||
Assert.assertEquals(512, buff.limit());
|
Assert.assertEquals(factory.getAlignment(), buff.limit());
|
||||||
file.close();
|
file.close();
|
||||||
factory.releaseBuffer(buff);
|
factory.releaseBuffer(buff);
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ public class NIOImportExportTest extends JournalImplTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExportImport() throws Exception {
|
public void testExportImport() throws Exception {
|
||||||
setup(10, 10 * 1024, true);
|
setup(10, 10 * 4096, true);
|
||||||
|
|
||||||
createJournal();
|
createJournal();
|
||||||
|
|
||||||
|
@ -99,7 +99,7 @@ public class NIOImportExportTest extends JournalImplTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExportImport3() throws Exception {
|
public void testExportImport3() throws Exception {
|
||||||
setup(10, 10 * 1024, true);
|
setup(10, 10 * 4096, true);
|
||||||
|
|
||||||
createJournal();
|
createJournal();
|
||||||
|
|
||||||
|
@ -162,7 +162,7 @@ public class NIOImportExportTest extends JournalImplTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testExportImport2() throws Exception {
|
public void testExportImport2() throws Exception {
|
||||||
setup(10, 10 * 1024, true);
|
setup(10, 10 * 4096, true);
|
||||||
|
|
||||||
createJournal();
|
createJournal();
|
||||||
|
|
||||||
|
|
|
@ -479,7 +479,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
|
||||||
performNonTransactionalDelete = false;
|
performNonTransactionalDelete = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
setup(2, 60 * 1024, false);
|
setup(2, 60 * 4096, false);
|
||||||
|
|
||||||
ArrayList<Long> liveIDs = new ArrayList<>();
|
ArrayList<Long> liveIDs = new ArrayList<>();
|
||||||
|
|
||||||
|
|
|
@ -50,7 +50,7 @@ public class CleanBufferTest extends ActiveMQTestBase {
|
||||||
@Test
|
@Test
|
||||||
public void testCleanOnAIO() {
|
public void testCleanOnAIO() {
|
||||||
if (LibaioContext.isLoaded()) {
|
if (LibaioContext.isLoaded()) {
|
||||||
SequentialFileFactory factory = new AIOSequentialFileFactory(new File("Whatever"), 50);
|
SequentialFileFactory factory = new AIOSequentialFileFactory(new File("./target"), 50);
|
||||||
|
|
||||||
testBuffer(factory);
|
testBuffer(factory);
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,9 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
|
||||||
@Override
|
@Override
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
|
//stop journal first to let it manage its files
|
||||||
|
stopComponent(journal);
|
||||||
|
|
||||||
List<String> files = fileFactory.listFiles(fileExtension);
|
List<String> files = fileFactory.listFiles(fileExtension);
|
||||||
|
|
||||||
for (String file : files) {
|
for (String file : files) {
|
||||||
|
|
|
@ -132,11 +132,11 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
checkFill(sf, 2048);
|
checkFill(factory, sf, 2048);
|
||||||
|
|
||||||
checkFill(sf, 512);
|
checkFill(factory, sf, 512);
|
||||||
|
|
||||||
checkFill(sf, 512 * 4);
|
checkFill(factory, sf, 512 * 4);
|
||||||
} finally {
|
} finally {
|
||||||
sf.close();
|
sf.close();
|
||||||
}
|
}
|
||||||
|
@ -226,19 +226,19 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
|
||||||
sf.write(bb1, true);
|
sf.write(bb1, true);
|
||||||
long bytesWritten = sf.position() - initialPos;
|
long bytesWritten = sf.position() - initialPos;
|
||||||
|
|
||||||
Assert.assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten);
|
Assert.assertEquals(calculateRecordSize(bytes1.length, factory.getAlignment()), bytesWritten);
|
||||||
|
|
||||||
initialPos = sf.position();
|
initialPos = sf.position();
|
||||||
sf.write(bb2, true);
|
sf.write(bb2, true);
|
||||||
bytesWritten = sf.position() - initialPos;
|
bytesWritten = sf.position() - initialPos;
|
||||||
|
|
||||||
Assert.assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten);
|
Assert.assertEquals(calculateRecordSize(bytes2.length, factory.getAlignment()), bytesWritten);
|
||||||
|
|
||||||
initialPos = sf.position();
|
initialPos = sf.position();
|
||||||
sf.write(bb3, true);
|
sf.write(bb3, true);
|
||||||
bytesWritten = sf.position() - initialPos;
|
bytesWritten = sf.position() - initialPos;
|
||||||
|
|
||||||
Assert.assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten);
|
Assert.assertEquals(calculateRecordSize(bytes3.length, factory.getAlignment()), bytesWritten);
|
||||||
|
|
||||||
sf.position(0);
|
sf.position(0);
|
||||||
|
|
||||||
|
@ -247,20 +247,20 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
|
||||||
ByteBuffer rb3 = factory.newBuffer(bytes3.length);
|
ByteBuffer rb3 = factory.newBuffer(bytes3.length);
|
||||||
|
|
||||||
int bytesRead = sf.read(rb1);
|
int bytesRead = sf.read(rb1);
|
||||||
Assert.assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesRead);
|
Assert.assertEquals(calculateRecordSize(bytes1.length, factory.getAlignment()), bytesRead);
|
||||||
|
|
||||||
for (int i = 0; i < bytes1.length; i++) {
|
for (int i = 0; i < bytes1.length; i++) {
|
||||||
Assert.assertEquals(bytes1[i], rb1.get(i));
|
Assert.assertEquals(bytes1[i], rb1.get(i));
|
||||||
}
|
}
|
||||||
|
|
||||||
bytesRead = sf.read(rb2);
|
bytesRead = sf.read(rb2);
|
||||||
Assert.assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesRead);
|
Assert.assertEquals(calculateRecordSize(bytes2.length, factory.getAlignment()), bytesRead);
|
||||||
for (int i = 0; i < bytes2.length; i++) {
|
for (int i = 0; i < bytes2.length; i++) {
|
||||||
Assert.assertEquals(bytes2[i], rb2.get(i));
|
Assert.assertEquals(bytes2[i], rb2.get(i));
|
||||||
}
|
}
|
||||||
|
|
||||||
bytesRead = sf.read(rb3);
|
bytesRead = sf.read(rb3);
|
||||||
Assert.assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesRead);
|
Assert.assertEquals(calculateRecordSize(bytes3.length, factory.getAlignment()), bytesRead);
|
||||||
for (int i = 0; i < bytes3.length; i++) {
|
for (int i = 0; i < bytes3.length; i++) {
|
||||||
Assert.assertEquals(bytes3[i], rb3.get(i));
|
Assert.assertEquals(bytes3[i], rb3.get(i));
|
||||||
}
|
}
|
||||||
|
@ -291,19 +291,19 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
|
||||||
sf.write(wrapBuffer(bytes1), true);
|
sf.write(wrapBuffer(bytes1), true);
|
||||||
long bytesWritten = sf.position() - initialPos;
|
long bytesWritten = sf.position() - initialPos;
|
||||||
|
|
||||||
Assert.assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten);
|
Assert.assertEquals(calculateRecordSize(bytes1.length, factory.getAlignment()), bytesWritten);
|
||||||
|
|
||||||
initialPos = sf.position();
|
initialPos = sf.position();
|
||||||
sf.write(wrapBuffer(bytes2), true);
|
sf.write(wrapBuffer(bytes2), true);
|
||||||
bytesWritten = sf.position() - initialPos;
|
bytesWritten = sf.position() - initialPos;
|
||||||
|
|
||||||
Assert.assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten);
|
Assert.assertEquals(calculateRecordSize(bytes2.length, factory.getAlignment()), bytesWritten);
|
||||||
|
|
||||||
initialPos = sf.position();
|
initialPos = sf.position();
|
||||||
sf.write(wrapBuffer(bytes3), true);
|
sf.write(wrapBuffer(bytes3), true);
|
||||||
bytesWritten = sf.position() - initialPos;
|
bytesWritten = sf.position() - initialPos;
|
||||||
|
|
||||||
Assert.assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten);
|
Assert.assertEquals(calculateRecordSize(bytes3.length, factory.getAlignment()), bytesWritten);
|
||||||
|
|
||||||
byte[] rbytes1 = new byte[bytes1.length];
|
byte[] rbytes1 = new byte[bytes1.length];
|
||||||
|
|
||||||
|
@ -315,7 +315,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
|
||||||
ByteBuffer rb2 = factory.newBuffer(rbytes2.length);
|
ByteBuffer rb2 = factory.newBuffer(rbytes2.length);
|
||||||
ByteBuffer rb3 = factory.newBuffer(rbytes3.length);
|
ByteBuffer rb3 = factory.newBuffer(rbytes3.length);
|
||||||
|
|
||||||
sf.position(calculateRecordSize(bytes1.length, sf.getAlignment()) + calculateRecordSize(bytes2.length, sf.getAlignment()));
|
sf.position(calculateRecordSize(bytes1.length, factory.getAlignment()) + calculateRecordSize(bytes2.length, factory.getAlignment()));
|
||||||
|
|
||||||
int bytesRead = sf.read(rb3);
|
int bytesRead = sf.read(rb3);
|
||||||
Assert.assertEquals(rb3.limit(), bytesRead);
|
Assert.assertEquals(rb3.limit(), bytesRead);
|
||||||
|
@ -361,7 +361,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
|
||||||
sf.write(wrapBuffer(bytes1), true);
|
sf.write(wrapBuffer(bytes1), true);
|
||||||
long bytesWritten = sf.position() - initialPos;
|
long bytesWritten = sf.position() - initialPos;
|
||||||
|
|
||||||
Assert.assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten);
|
Assert.assertEquals(calculateRecordSize(bytes1.length, factory.getAlignment()), bytesWritten);
|
||||||
|
|
||||||
sf.close();
|
sf.close();
|
||||||
|
|
||||||
|
@ -386,7 +386,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
|
||||||
return ActiveMQBuffers.wrappedBuffer(bytes);
|
return ActiveMQBuffers.wrappedBuffer(bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void checkFill(final SequentialFile file, final int size) throws Exception {
|
protected void checkFill(final SequentialFileFactory factory, final SequentialFile file, final int size) throws Exception {
|
||||||
file.fill(size);
|
file.fill(size);
|
||||||
|
|
||||||
file.close();
|
file.close();
|
||||||
|
@ -399,7 +399,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
|
||||||
|
|
||||||
int bytesRead = file.read(bb);
|
int bytesRead = file.read(bb);
|
||||||
|
|
||||||
Assert.assertEquals(calculateRecordSize(size, file.getAlignment()), bytesRead);
|
Assert.assertEquals(calculateRecordSize(size, factory.getAlignment()), bytesRead);
|
||||||
|
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
// log.debug(" i is " + i);
|
// log.debug(" i is " + i);
|
||||||
|
|
|
@ -37,7 +37,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
|
||||||
|
|
||||||
private final Map<String, FakeSequentialFile> fileMap = new ConcurrentHashMap<>();
|
private final Map<String, FakeSequentialFile> fileMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final int alignment;
|
private volatile int alignment;
|
||||||
|
|
||||||
private final boolean supportsCallback;
|
private final boolean supportsCallback;
|
||||||
|
|
||||||
|
@ -198,6 +198,12 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
|
||||||
return alignment;
|
return alignment;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FakeSequentialFileFactory setAlignment(int alignment) {
|
||||||
|
this.alignment = alignment;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
// Package protected ---------------------------------------------
|
// Package protected ---------------------------------------------
|
||||||
|
|
||||||
// Protected -----------------------------------------------------
|
// Protected -----------------------------------------------------
|
||||||
|
@ -462,11 +468,6 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getAlignment() throws Exception {
|
|
||||||
return alignment;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int calculateBlockStart(final int position) throws Exception {
|
public int calculateBlockStart(final int position) throws Exception {
|
||||||
int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
|
int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
|
||||||
|
|
Loading…
Reference in New Issue