From ce035a8084874da3004cded844221629a9a3bc2e Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 1 Feb 2017 22:51:51 -0500 Subject: [PATCH 1/4] ARTEMIS-937 Implementing proper alignment and adding perf-journal tool to validate the journal syncs --- .../apache/activemq/artemis/cli/Artemis.java | 4 +- .../activemq/artemis/cli/commands/Create.java | 75 ++++++++++++------- .../{SyncRecalc.java => PerfJournal.java} | 45 ++++++++++- .../cli/commands/util/SyncCalculation.java | 39 +++++++--- .../apache/activemq/cli/test/ArtemisTest.java | 30 +++++++- .../jdbc/store/file/JDBCSequentialFile.java | 5 -- .../store/file/JDBCSequentialFileFactory.java | 6 ++ .../journal/JMSJournalStorageManagerImpl.java | 6 +- .../io/AbstractSequentialFileFactory.java | 16 ++++ .../artemis/core/io/SequentialFile.java | 2 - .../core/io/SequentialFileFactory.java | 2 + .../core/io/aio/AIOSequentialFile.java | 10 +-- .../core/io/aio/AIOSequentialFileFactory.java | 37 ++++++--- .../core/io/mapped/MappedSequentialFile.java | 5 -- .../mapped/MappedSequentialFileFactory.java | 6 ++ .../core/io/nio/NIOSequentialFile.java | 5 -- .../activemq/artemis/jlibaio/LibaioFile.java | 4 +- .../artemis/core/config/Configuration.java | 2 +- .../artemis/core/config/impl/Validators.java | 4 +- .../impl/FileConfigurationParser.java | 11 +-- .../impl/journal/JournalStorageManager.java | 25 ++++--- .../artemis/core/server/JournalType.java | 28 ++++++- .../journal/AIOSequentialFileFactoryTest.java | 2 +- .../journal/NIOImportExportTest.java | 6 +- .../journal/NIOJournalCompactTest.java | 2 +- .../core/journal/impl/CleanBufferTest.java | 2 +- .../journal/impl/JournalImplTestUnit.java | 3 + .../impl/SequentialFileFactoryTestBase.java | 32 ++++---- .../impl/fakes/FakeSequentialFileFactory.java | 13 ++-- 29 files changed, 293 insertions(+), 134 deletions(-) rename artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/{SyncRecalc.java => PerfJournal.java} (57%) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java index 18449e939c..843b6f1a18 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/Artemis.java @@ -50,7 +50,7 @@ import org.apache.activemq.artemis.cli.commands.tools.DecodeJournal; import org.apache.activemq.artemis.cli.commands.tools.EncodeJournal; import org.apache.activemq.artemis.cli.commands.tools.HelpData; import org.apache.activemq.artemis.cli.commands.tools.PrintData; -import org.apache.activemq.artemis.cli.commands.tools.SyncRecalc; +import org.apache.activemq.artemis.cli.commands.tools.PerfJournal; import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter; import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter; import org.apache.activemq.artemis.cli.commands.user.AddUser; @@ -163,7 +163,7 @@ public class Artemis { withDefaultCommand(HelpData.class).withCommands(PrintData.class, XmlDataExporter.class, XmlDataImporter.class, DecodeJournal.class, EncodeJournal.class, CompactJournal.class); builder.withGroup("user").withDescription("default file-based user management (add|rm|list|reset) (example ./artemis user list)"). withDefaultCommand(HelpUser.class).withCommands(ListUser.class, AddUser.class, RemoveUser.class, ResetUser.class); - builder = builder.withCommands(Run.class, Stop.class, Kill.class, SyncRecalc.class); + builder = builder.withCommands(Run.class, Stop.class, Kill.class, PerfJournal.class); } else { builder.withGroup("data").withDescription("data tools group (print) (example ./artemis data print)"). withDefaultCommand(HelpData.class).withCommands(PrintData.class); diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java index 20a8043f90..29635e6163 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java @@ -40,6 +40,7 @@ import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.CLIException; import org.apache.activemq.artemis.cli.commands.util.HashUtil; import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; +import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.jlibaio.LibaioFile; @@ -209,11 +210,14 @@ public class Create extends InputAbstract { @Option(name = "--addresses", description = "comma separated list of addresses ") String addresses; - @Option(name = "--aio", description = "Force aio journal on the configuration regardless of the library being available or not.") - boolean forceLibaio; + @Option(name = "--aio", description = "sets the journal as asyncio.") + boolean aio; - @Option(name = "--nio", description = "Force nio journal on the configuration regardless of the library being available or not.") - boolean forceNIO; + @Option(name = "--nio", description = "sets the journal as nio.") + boolean nio; + + // this is used by the setupJournalType method + private JournalType journalType; @Option(name = "--disable-persistence", description = "Disable message persistence to the journal") boolean disablePersistence; @@ -558,15 +562,13 @@ public class Create extends InputAbstract { throw new RuntimeException(String.format("The path '%s' is not writable.", directory)); } } - public Object run(ActionContext context) throws Exception { - if (forceLibaio && forceNIO) { - throw new RuntimeException("You can't specify --nio and --aio in the same execution."); - } IS_WINDOWS = System.getProperty("os.name").toLowerCase().trim().startsWith("win"); IS_CYGWIN = IS_WINDOWS && "cygwin".equals(System.getenv("OSTYPE")); + setupJournalType(); + // requireLogin should set alloAnonymous=false, to avoid user's questions if (requireLogin != null && requireLogin.booleanValue()) { allowAnonymous = Boolean.FALSE; @@ -603,15 +605,7 @@ public class Create extends InputAbstract { filters.put("${shared-store.settings}", ""); } - boolean aio; - - if (IS_WINDOWS || !supportsLibaio()) { - aio = false; - filters.put("${journal.settings}", "NIO"); - } else { - aio = true; - filters.put("${journal.settings}", "ASYNCIO"); - } + filters.put("${journal.settings}", journalType.name()); if (sslKey != null) { filters.put("${web.protocol}", "https"); @@ -761,7 +755,7 @@ public class Create extends InputAbstract { filters.put("${auto-create}", isAutoCreate() ? "true" : "false"); - performAutoTune(filters, aio, dataFolder); + performAutoTune(filters, journalType, dataFolder); write(ETC_BROKER_XML, filters, false); write(ETC_ARTEMIS_USERS_PROPERTIES, filters, false); @@ -802,6 +796,38 @@ public class Create extends InputAbstract { return null; } + private void setupJournalType() { + int countJournalTypes = countBoolean(aio, nio); + if (countJournalTypes > 1) { + throw new RuntimeException("You can only select one journal type (--nio | --aio | --mapped)."); + } + + if (countJournalTypes == 0) { + if (supportsLibaio()) { + aio = true; + } else { + nio = true; + } + } + + if (aio) { + journalType = JournalType.ASYNCIO; + } else if (nio) { + journalType = JournalType.NIO; + } + } + + + private static int countBoolean(boolean...b) { + int count = 0; + + for (boolean itemB : b) { + if (itemB) count++; + } + + return count; + } + private String getLogManager() throws IOException { String logManager = ""; File dir = new File(path(getHome().toString(), false) + "/lib"); @@ -858,7 +884,7 @@ public class Create extends InputAbstract { filters.put("${address-queue.settings}", writer.toString()); } - private void performAutoTune(HashMap filters, boolean aio, File dataFolder) { + private void performAutoTune(HashMap filters, JournalType journalType, File dataFolder) { if (noAutoTune) { filters.put("${journal-buffer.settings}", ""); } else { @@ -867,7 +893,7 @@ public class Create extends InputAbstract { System.out.println(""); System.out.println("Auto tuning journal ..."); - long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, aio); + long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, journalType); long nanoseconds = SyncCalculation.toNanos(time, writes, verbose); double writesPerMillisecond = (double) writes / (double) time; @@ -891,13 +917,10 @@ public class Create extends InputAbstract { } public boolean supportsLibaio() { - if (forceLibaio) { - // forcing libaio - return true; - } else if (forceNIO) { - // forcing NIO + if (IS_WINDOWS) { return false; - } else if (LibaioContext.isLoaded()) { + } + if (LibaioContext.isLoaded()) { try (LibaioContext context = new LibaioContext(1, true, true)) { File tmpFile = new File(directory, "validateAIO.bin"); boolean supportsLibaio = true; diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/SyncRecalc.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PerfJournal.java similarity index 57% rename from artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/SyncRecalc.java rename to artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PerfJournal.java index 3b0bc3d238..f7d89ecce5 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/SyncRecalc.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PerfJournal.java @@ -20,13 +20,35 @@ package org.apache.activemq.artemis.cli.commands.tools; import java.text.DecimalFormat; import io.airlift.airline.Command; +import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; import org.apache.activemq.artemis.core.config.impl.FileConfiguration; import org.apache.activemq.artemis.core.server.JournalType; -@Command(name = "sync", description = "Calculates the journal-buffer-timeout you should use with the current data folder") -public class SyncRecalc extends LockAbstract { +@Command(name = "perf-journal", description = "Calculates the journal-buffer-timeout you should use with the current data folder") +public class PerfJournal extends LockAbstract { + + + @Option(name = "--block-size", description = "The block size for each write (default 4096)") + public int size = 4 * 1024; + + + @Option(name = "--writes", description = "The number of writes to be performed (default 250)") + public int writes = 250; + + @Option(name = "--tries", description = "The number of tries for the test (default 5)") + public int tries = 5; + + @Option(name = "--no-sync", description = "Disable sync") + public boolean nosyncs = false; + + @Option(name = "--sync", description = "Enable syncs") + public boolean syncs = false; + + @Option(name = "--journal-type", description = "Journal Type to be used (default from broker.xml)") + public String journalType = null; + @Override public Object execute(ActionContext context) throws Exception { @@ -34,11 +56,26 @@ public class SyncRecalc extends LockAbstract { FileConfiguration fileConfiguration = getFileConfiguration(); - int writes = 250; + if (nosyncs) { + fileConfiguration.setJournalDatasync(false); + } else if (syncs) { + fileConfiguration.setJournalDatasync(true); + } + + + if (journalType != null) { + fileConfiguration.setJournalType(JournalType.getType(journalType)); + } + System.out.println(""); System.out.println("Auto tuning journal ..."); - long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), 4096, writes, 5, verbose, fileConfiguration.isJournalDatasync(), fileConfiguration.getJournalType() == JournalType.ASYNCIO); + System.out.println("Performing " + tries + " tests writing " + writes + " blocks of " + size + " on each test, sync=" + fileConfiguration.isJournalDatasync() + " with journalType = " + fileConfiguration.getJournalType()); + + fileConfiguration.getJournalLocation().mkdirs(); + + long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), fileConfiguration.getJournalType()); + long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose); double writesPerMillisecond = (double) writes / (double) time; diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java index 52ef87b0d2..ae7a8ca389 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java @@ -28,6 +28,8 @@ import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; +import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.utils.ReusableLatch; @@ -47,8 +49,8 @@ public class SyncCalculation { int tries, boolean verbose, boolean fsync, - boolean aio) throws Exception { - SequentialFileFactory factory = newFactory(datafolder, fsync, aio); + JournalType journalType) throws Exception { + SequentialFileFactory factory = newFactory(datafolder, fsync, journalType, blockSize * blocks); if (verbose) { System.out.println("Using " + factory.getClass().getName() + " to calculate sync times"); @@ -61,6 +63,8 @@ public class SyncCalculation { file.fill(blockSize * blocks); + file.close(); + long[] result = new long[tries]; byte[] block = new byte[blockSize]; @@ -94,6 +98,7 @@ public class SyncCalculation { System.out.println("**************************************************"); System.out.println(ntry + " of " + tries + " calculation"); } + file.open(); file.position(0); long start = System.currentTimeMillis(); for (int i = 0; i < blocks; i++) { @@ -115,6 +120,7 @@ public class SyncCalculation { System.out.println("bufferTimeout = " + toNanos(result[ntry], blocks, verbose)); System.out.println("**************************************************"); } + file.close(); } factory.releaseDirectBuffer(bufferBlock); @@ -162,17 +168,26 @@ public class SyncCalculation { return timeWait; } - private static SequentialFileFactory newFactory(File datafolder, boolean datasync, boolean aio) { - if (aio && LibaioContext.isLoaded()) { - SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync); - factory.start(); - ((AIOSequentialFileFactory) factory).disableBufferReuse(); + private static SequentialFileFactory newFactory(File datafolder, boolean datasync, JournalType journalType, int fileSize) { + SequentialFileFactory factory; - return factory; - } else { - SequentialFileFactory factory = new NIOSequentialFileFactory(datafolder, 1); - factory.start(); - return factory; + if (journalType == JournalType.ASYNCIO && !LibaioContext.isLoaded()) { + journalType = JournalType.NIO; + } + + switch (journalType) { + + case NIO: + factory = new NIOSequentialFileFactory(datafolder, 1); + factory.start(); + return factory; + case ASYNCIO: + factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync); + factory.start(); + ((AIOSequentialFileFactory) factory).disableBufferReuse(); + return factory; + default: + throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(journalType); } } } diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java index a8bd54179f..12af0929a8 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java @@ -47,6 +47,7 @@ import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.config.FileDeploymentManager; import org.apache.activemq.artemis.core.config.impl.FileConfiguration; +import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; @@ -114,7 +115,7 @@ public class ArtemisTest extends CliTestBase { public void testSync() throws Exception { int writes = 20; int tries = 10; - long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true); + long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, JournalType.NIO); System.out.println(); System.out.println("TotalAvg = " + totalAvg); long nanoTime = SyncCalculation.toNanos(totalAvg, writes, false); @@ -130,6 +131,19 @@ public class ArtemisTest extends CliTestBase { Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--no-fsync"); } + + @Test + public void testSimpleCreateMapped() throws Throwable { + try { + //instance1: default using http + File instance1 = new File(temporaryFolder.getRoot(), "instance1"); + Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--mapped"); + } catch (Throwable e) { + e.printStackTrace(); + throw e; + } + } + @Test public void testWebConfig() throws Exception { setupAuth(); @@ -500,6 +514,20 @@ public class ArtemisTest extends CliTestBase { } + @Test + public void testPerfJournal() throws Exception { + File instanceFolder = temporaryFolder.newFolder("server1"); + setupAuth(instanceFolder); + + Run.setEmbedded(true); + Artemis.main("create", instanceFolder.getAbsolutePath(), "--force", "--silent", "--no-web", "--no-autotune", "--require-login"); + System.setProperty("artemis.instance", instanceFolder.getAbsolutePath()); + + Artemis.main("perf-journal", "--journal-type", "NIO", "--writes", "5000", "--tries", "50", "--verbose"); + + } + + public void testSimpleRun(String folderName) throws Exception { File instanceFolder = temporaryFolder.newFolder(folderName); diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java index 69ff11a321..85c4877804 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java @@ -118,11 +118,6 @@ public class JDBCSequentialFile implements SequentialFile { return writePosition + size <= dbDriver.getMaxSize(); } - @Override - public int getAlignment() throws Exception { - return 0; - } - @Override public int calculateBlockStart(int position) throws Exception { return 0; diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java index 4b92c7100e..fa88a85623 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -176,6 +176,12 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM return 1; } + @Override + public JDBCSequentialFileFactory setAlignment(int alignment) { + // no op + return this; + } + @Override public int calculateBlockSize(final int bytes) { return bytes; diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java index 0aaa1a6415..bc288db496 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/persistence/impl/journal/JMSJournalStorageManagerImpl.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.jms.persistence.impl.journal; import java.io.File; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -78,8 +79,9 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager { final IDGenerator idGenerator, final Configuration config, final ReplicationManager replicator) { - if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO) { - throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals"); + final EnumSet supportedJournalTypes = EnumSet.allOf(JournalType.class); + if (!supportedJournalTypes.contains(config.getJournalType())) { + throw new IllegalArgumentException("Only " + supportedJournalTypes + " are supported Journal types"); } this.config = config; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java index 5aa723d86c..4310e84df2 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java @@ -54,6 +54,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac protected boolean dataSync = true; + protected volatile int alignment = -1; + private final IOCriticalErrorListener critialErrorListener; /** @@ -83,6 +85,20 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac this.maxIO = maxIO; } + @Override + public int getAlignment() { + if (alignment < 0) { + alignment = 1; + } + return alignment; + } + + @Override + public AbstractSequentialFileFactory setAlignment(int alignment) { + this.alignment = alignment; + return this; + } + @Override public SequentialFileFactory setDatasync(boolean enabled) { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java index 8f7cfb5ad4..53c989055b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFile.java @@ -43,8 +43,6 @@ public interface SequentialFile { boolean fits(int size); - int getAlignment() throws Exception; - int calculateBlockStart(int position) throws Exception; String getFileName(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java index 2229edf2f9..c8277e3b39 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java @@ -79,6 +79,8 @@ public interface SequentialFileFactory { int getAlignment(); + SequentialFileFactory setAlignment(int alignment); + int calculateBlockSize(int bytes); File getDirectory(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index 874e411d63..21f5e0dac7 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -78,17 +78,9 @@ public class AIOSequentialFile extends AbstractSequentialFile { return opened; } - @Override - public int getAlignment() { - // TODO: get the alignment from the file system, but we have to cache this, we can't call it every time - /* checkOpened(); - return aioFile.getBlockSize(); */ - return 512; - } - @Override public int calculateBlockStart(final int position) { - int alignment = getAlignment(); + int alignment = factory.getAlignment(); int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index 57d18f5c18..e7b0dfaffd 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -143,13 +143,13 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor @Override public ByteBuffer allocateDirectBuffer(final int size) { - int blocks = size / 512; - if (size % 512 != 0) { + int blocks = size / getAlignment(); + if (size % getAlignment() != 0) { blocks++; } - // The buffer on AIO has to be a multiple of 512 - ByteBuffer buffer = LibaioContext.newAlignedBuffer(blocks * 512, 512); + // The buffer on AIO has to be a multiple of getAlignment() + ByteBuffer buffer = LibaioContext.newAlignedBuffer(blocks * getAlignment(), getAlignment()); buffer.limit(size); @@ -163,8 +163,8 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor @Override public ByteBuffer newBuffer(int size) { - if (size % 512 != 0) { - size = (size / 512 + 1) * 512; + if (size % getAlignment() != 0) { + size = (size / getAlignment() + 1) * getAlignment(); } return buffersControl.newBuffer(size); @@ -178,7 +178,26 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor @Override public int getAlignment() { - return 512; + if (alignment < 0) { + + File checkFile = null; + + try { + journalDir.mkdirs(); + checkFile = File.createTempFile("journalCheck", ".tmp", journalDir); + checkFile.mkdirs(); + checkFile.createNewFile(); + alignment = LibaioContext.getBlockSize(checkFile); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + alignment = 512; + } finally { + if (checkFile != null) { + checkFile.delete(); + } + } + } + return alignment; } // For tests only @@ -399,7 +418,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor // if a buffer is bigger than the configured-bufferSize, we just create a new // buffer. if (size > bufferSize) { - return LibaioContext.newAlignedBuffer(size, 512); + return LibaioContext.newAlignedBuffer(size, getAlignment()); } else { // We need to allocate buffers following the rules of the storage // being used (AIO/NIO) @@ -410,7 +429,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor if (buffer == null) { // if empty create a new one. - buffer = LibaioContext.newAlignedBuffer(size, 512); + buffer = LibaioContext.newAlignedBuffer(size, getAlignment()); buffer.limit(alignedSize); } else { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java index 017948bcbe..a9591136ae 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java @@ -113,11 +113,6 @@ final class MappedSequentialFile implements SequentialFile { return hasRemaining; } - @Override - public int getAlignment() { - return 0; - } - @Override public int calculateBlockStart(int position) { return position; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java index 8ccef74bc1..55bb2bf72c 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java @@ -161,6 +161,12 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory return 1; } + @Override + public SequentialFileFactory setAlignment(int alignment) { + // no op + return this; + } + @Override public int calculateBlockSize(int bytes) { return bytes; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index 2887d25fe2..29e5b813e8 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -53,11 +53,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile { defaultMaxIO = maxIO; } - @Override - public int getAlignment() { - return 1; - } - @Override public int calculateBlockStart(final int position) { return position; diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java index 43d80eab46..dc8644966c 100644 --- a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java +++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioFile.java @@ -39,9 +39,7 @@ public final class LibaioFile implements AutoClosea } public int getBlockSize() { - return 512; - // FIXME - //return LibaioContext.getBlockSizeFD(fd); + return LibaioContext.getBlockSizeFD(fd); } public boolean lock() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 8218026796..7881470e2d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -563,7 +563,7 @@ public interface Configuration { Configuration setJournalDirectory(String dir); /** - * Returns the type of journal used by this server (either {@code NIO} or {@code ASYNCIO}). + * Returns the type of journal used by this server ({@code NIO}, {@code ASYNCIO} or {@code MAPPED}). *
* Default value is ASYNCIO. */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java index 4d688c8889..b4f879eb38 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.config.impl; +import java.util.EnumSet; + import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; import org.apache.activemq.artemis.core.server.JournalType; @@ -125,7 +127,7 @@ public final class Validators { @Override public void validate(final String name, final Object value) { String val = (String) value; - if (val == null || !val.equals(JournalType.NIO.toString()) && !val.equals(JournalType.ASYNCIO.toString())) { + if (val == null || !EnumSet.allOf(JournalType.class).contains(JournalType.valueOf(val))) { throw ActiveMQMessageBundle.BUNDLE.invalidJournalType(val); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index d58941691b..6211a1b5a8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -497,22 +497,19 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { String s = getString(e, "journal-type", config.getJournalType().toString(), Validators.JOURNAL_TYPE); - if (s.equals(JournalType.NIO.toString())) { - config.setJournalType(JournalType.NIO); - } else if (s.equals(JournalType.ASYNCIO.toString())) { + config.setJournalType(JournalType.getType(s)); + + if (config.getJournalType() == JournalType.ASYNCIO) { // https://jira.jboss.org/jira/browse/HORNETQ-295 // We do the check here to see if AIO is supported so we can use the correct defaults and/or use // correct settings in xml // If we fall back later on these settings can be ignored boolean supportsAIO = AIOSequentialFileFactory.isSupported(); - if (supportsAIO) { - config.setJournalType(JournalType.ASYNCIO); - } else { + if (!supportsAIO) { if (validateAIO) { ActiveMQServerLogger.LOGGER.AIONotFound(); } - config.setJournalType(JournalType.NIO); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index a1346ec32d..7c0a6510db 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -21,6 +21,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -113,7 +114,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { @Override protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) { - if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO) { + if (!EnumSet.allOf(JournalType.class).contains(config.getJournalType())) { throw ActiveMQMessageBundle.BUNDLE.invalidJournal(); } @@ -125,21 +126,23 @@ public class JournalStorageManager extends AbstractJournalStorageManager { bindingsJournal = localBindings; originalBindingsJournal = localBindings; - if (config.getJournalType() == JournalType.ASYNCIO) { - ActiveMQServerLogger.LOGGER.journalUseAIO(); + switch (config.getJournalType()) { - journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener); - } else if (config.getJournalType() == JournalType.NIO) { - ActiveMQServerLogger.LOGGER.journalUseNIO(); - journalFF = new NIOSequentialFileFactory(config.getJournalLocation(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener); - } else { - throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType()); + case NIO: + ActiveMQServerLogger.LOGGER.journalUseNIO(); + journalFF = new NIOSequentialFileFactory(config.getJournalLocation(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener); + break; + case ASYNCIO: + ActiveMQServerLogger.LOGGER.journalUseAIO(); + journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener); + break; + default: + throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType()); } journalFF.setDatasync(config.isJournalDatasync()); - - Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO(), 0); + Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0); messageJournal = localMessage; originalMessageJournal = localMessage; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java index ec5a9641f9..2716a384e3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java @@ -16,6 +16,32 @@ */ package org.apache.activemq.artemis.core.server; + public enum JournalType { - NIO, ASYNCIO; + NIO, ASYNCIO, MAPPED; + + public static final String validValues; + + static { + StringBuffer stringBuffer = new StringBuffer(); + for (JournalType type : JournalType.values()) { + + if (stringBuffer.length() != 0) { + stringBuffer.append(","); + } + + stringBuffer.append(type.name()); + } + + validValues = stringBuffer.toString(); + } + + public static JournalType getType(String type) { + switch (type) { + case "NIO": return NIO; + case "ASYNCIO" : return ASYNCIO; + default: throw new IllegalStateException("Invalid JournalType:" + type + " valid Types: " + validValues); + } + } + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOSequentialFileFactoryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOSequentialFileFactoryTest.java index fd19727373..4e7f315698 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOSequentialFileFactoryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOSequentialFileFactoryTest.java @@ -44,7 +44,7 @@ public class AIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase SequentialFile file = factory.createSequentialFile("filtetmp.log"); file.open(); ByteBuffer buff = factory.newBuffer(10); - Assert.assertEquals(512, buff.limit()); + Assert.assertEquals(factory.getAlignment(), buff.limit()); file.close(); factory.releaseBuffer(buff); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOImportExportTest.java index 4521d976e1..d108958e23 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOImportExportTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOImportExportTest.java @@ -45,7 +45,7 @@ public class NIOImportExportTest extends JournalImplTestBase { @Test public void testExportImport() throws Exception { - setup(10, 10 * 1024, true); + setup(10, 10 * 4096, true); createJournal(); @@ -99,7 +99,7 @@ public class NIOImportExportTest extends JournalImplTestBase { @Test public void testExportImport3() throws Exception { - setup(10, 10 * 1024, true); + setup(10, 10 * 4096, true); createJournal(); @@ -162,7 +162,7 @@ public class NIOImportExportTest extends JournalImplTestBase { @Test public void testExportImport2() throws Exception { - setup(10, 10 * 1024, true); + setup(10, 10 * 4096, true); createJournal(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java index 42c48f3fa1..38cc126d72 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/NIOJournalCompactTest.java @@ -479,7 +479,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase { performNonTransactionalDelete = false; } - setup(2, 60 * 1024, false); + setup(2, 60 * 4096, false); ArrayList liveIDs = new ArrayList<>(); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/CleanBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/CleanBufferTest.java index 48cba761fc..b92240b010 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/CleanBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/CleanBufferTest.java @@ -50,7 +50,7 @@ public class CleanBufferTest extends ActiveMQTestBase { @Test public void testCleanOnAIO() { if (LibaioContext.isLoaded()) { - SequentialFileFactory factory = new AIOSequentialFileFactory(new File("Whatever"), 50); + SequentialFileFactory factory = new AIOSequentialFileFactory(new File("./target"), 50); testBuffer(factory); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index 3be030d3ec..d1536d48c8 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -39,6 +39,9 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { @Override @After public void tearDown() throws Exception { + //stop journal first to let it manage its files + stopComponent(journal); + List files = fileFactory.listFiles(fileExtension); for (String file : files) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java index da7fc782cb..d27aa9b25c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java @@ -132,11 +132,11 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase { try { - checkFill(sf, 2048); + checkFill(factory, sf, 2048); - checkFill(sf, 512); + checkFill(factory, sf, 512); - checkFill(sf, 512 * 4); + checkFill(factory, sf, 512 * 4); } finally { sf.close(); } @@ -226,19 +226,19 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase { sf.write(bb1, true); long bytesWritten = sf.position() - initialPos; - Assert.assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten); + Assert.assertEquals(calculateRecordSize(bytes1.length, factory.getAlignment()), bytesWritten); initialPos = sf.position(); sf.write(bb2, true); bytesWritten = sf.position() - initialPos; - Assert.assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten); + Assert.assertEquals(calculateRecordSize(bytes2.length, factory.getAlignment()), bytesWritten); initialPos = sf.position(); sf.write(bb3, true); bytesWritten = sf.position() - initialPos; - Assert.assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten); + Assert.assertEquals(calculateRecordSize(bytes3.length, factory.getAlignment()), bytesWritten); sf.position(0); @@ -247,20 +247,20 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase { ByteBuffer rb3 = factory.newBuffer(bytes3.length); int bytesRead = sf.read(rb1); - Assert.assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesRead); + Assert.assertEquals(calculateRecordSize(bytes1.length, factory.getAlignment()), bytesRead); for (int i = 0; i < bytes1.length; i++) { Assert.assertEquals(bytes1[i], rb1.get(i)); } bytesRead = sf.read(rb2); - Assert.assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesRead); + Assert.assertEquals(calculateRecordSize(bytes2.length, factory.getAlignment()), bytesRead); for (int i = 0; i < bytes2.length; i++) { Assert.assertEquals(bytes2[i], rb2.get(i)); } bytesRead = sf.read(rb3); - Assert.assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesRead); + Assert.assertEquals(calculateRecordSize(bytes3.length, factory.getAlignment()), bytesRead); for (int i = 0; i < bytes3.length; i++) { Assert.assertEquals(bytes3[i], rb3.get(i)); } @@ -291,19 +291,19 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase { sf.write(wrapBuffer(bytes1), true); long bytesWritten = sf.position() - initialPos; - Assert.assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten); + Assert.assertEquals(calculateRecordSize(bytes1.length, factory.getAlignment()), bytesWritten); initialPos = sf.position(); sf.write(wrapBuffer(bytes2), true); bytesWritten = sf.position() - initialPos; - Assert.assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten); + Assert.assertEquals(calculateRecordSize(bytes2.length, factory.getAlignment()), bytesWritten); initialPos = sf.position(); sf.write(wrapBuffer(bytes3), true); bytesWritten = sf.position() - initialPos; - Assert.assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten); + Assert.assertEquals(calculateRecordSize(bytes3.length, factory.getAlignment()), bytesWritten); byte[] rbytes1 = new byte[bytes1.length]; @@ -315,7 +315,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase { ByteBuffer rb2 = factory.newBuffer(rbytes2.length); ByteBuffer rb3 = factory.newBuffer(rbytes3.length); - sf.position(calculateRecordSize(bytes1.length, sf.getAlignment()) + calculateRecordSize(bytes2.length, sf.getAlignment())); + sf.position(calculateRecordSize(bytes1.length, factory.getAlignment()) + calculateRecordSize(bytes2.length, factory.getAlignment())); int bytesRead = sf.read(rb3); Assert.assertEquals(rb3.limit(), bytesRead); @@ -361,7 +361,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase { sf.write(wrapBuffer(bytes1), true); long bytesWritten = sf.position() - initialPos; - Assert.assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten); + Assert.assertEquals(calculateRecordSize(bytes1.length, factory.getAlignment()), bytesWritten); sf.close(); @@ -386,7 +386,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase { return ActiveMQBuffers.wrappedBuffer(bytes); } - protected void checkFill(final SequentialFile file, final int size) throws Exception { + protected void checkFill(final SequentialFileFactory factory, final SequentialFile file, final int size) throws Exception { file.fill(size); file.close(); @@ -399,7 +399,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase { int bytesRead = file.read(bb); - Assert.assertEquals(calculateRecordSize(size, file.getAlignment()), bytesRead); + Assert.assertEquals(calculateRecordSize(size, factory.getAlignment()), bytesRead); for (int i = 0; i < size; i++) { // log.debug(" i is " + i); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java index 0316945bc3..1f933fbda3 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java @@ -37,7 +37,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory { private final Map fileMap = new ConcurrentHashMap<>(); - private final int alignment; + private volatile int alignment; private final boolean supportsCallback; @@ -198,6 +198,12 @@ public class FakeSequentialFileFactory implements SequentialFileFactory { return alignment; } + @Override + public FakeSequentialFileFactory setAlignment(int alignment) { + this.alignment = alignment; + return this; + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- @@ -462,11 +468,6 @@ public class FakeSequentialFileFactory implements SequentialFileFactory { } } - @Override - public int getAlignment() throws Exception { - return alignment; - } - @Override public int calculateBlockStart(final int position) throws Exception { int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; From ef8cb60df718049af827b61108316ddb306cfbdd Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 2 Feb 2017 21:07:06 -0500 Subject: [PATCH 2/4] NO-JIRA Fixing deadlock on JDBCJournal::stop / sync methods --- .../artemis/jdbc/store/journal/JDBCJournalImpl.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index f7187854b6..dbab0e6cd7 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -74,8 +74,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { private final Executor completeExecutor; - private final Object journalLock = new Object(); - private final ScheduledExecutorService scheduledExecutorService; // Track Tx Records @@ -135,11 +133,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { @Override public synchronized void stop() throws SQLException { if (started) { - synchronized (journalLock) { - sync(); - started = false; - super.stop(); - } + sync(); + started = false; + super.stop(); } } @@ -305,7 +301,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { record.setIoCompletion(callback); } - synchronized (journalLock) { + synchronized (this) { if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { addTxRecord(record); } From aacddfda61804b203dc8b3efdebafa9384662e22 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Fri, 16 Dec 2016 11:12:22 +0100 Subject: [PATCH 3/4] ARTEMIS-906 Memory Mapped JournalType --- .../activemq/artemis/cli/commands/Create.java | 7 +- .../cli/commands/util/SyncCalculation.java | 12 + .../UnpooledUnsafeDirectByteBufWrapper.java | 371 +++++++++++++++++ .../artemis/core/io/mapped/MappedFile.java | 95 +++-- .../core/io/mapped/MappedSequentialFile.java | 92 ++--- .../mapped/MappedSequentialFileFactory.java | 75 ++-- .../core/io/mapped/TimedSequentialFile.java | 377 ++++++++++++++++++ .../artemis/core/io/JournalTptBenchmark.java | 208 ++++++++++ .../core/io/SequentialFileTptBenchmark.java | 203 ++++++++++ .../impl/journal/JournalStorageManager.java | 6 + .../core/server/ActiveMQServerLogger.java | 6 +- .../artemis/core/server/JournalType.java | 1 + .../schema/artemis-configuration.xsd | 1 + .../test/resources/artemis-configuration.xsd | 1 + .../journal/AIOJournalImplTest.java | 3 +- .../journal/MappedImportExportTest.java | 30 ++ .../journal/MappedJournalCompactTest.java | 37 ++ .../journal/MappedJournalImplTest.java | 43 ++ .../MappedSequentialFileFactoryTest.java | 184 +++++++++ .../ValidateTransactionHealthTest.java | 37 +- 20 files changed, 1667 insertions(+), 122 deletions(-) create mode 100644 artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java create mode 100644 artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java create mode 100644 artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java create mode 100644 artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java index 29635e6163..cd42f808d2 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java @@ -216,6 +216,9 @@ public class Create extends InputAbstract { @Option(name = "--nio", description = "sets the journal as nio.") boolean nio; + @Option(name = "--mapped", description = "Sets the journal as mapped.") + boolean mapped; + // this is used by the setupJournalType method private JournalType journalType; @@ -797,7 +800,7 @@ public class Create extends InputAbstract { } private void setupJournalType() { - int countJournalTypes = countBoolean(aio, nio); + int countJournalTypes = countBoolean(aio, nio, mapped); if (countJournalTypes > 1) { throw new RuntimeException("You can only select one journal type (--nio | --aio | --mapped)."); } @@ -814,6 +817,8 @@ public class Create extends InputAbstract { journalType = JournalType.ASYNCIO; } else if (nio) { journalType = JournalType.NIO; + } else if (mapped) { + journalType = JournalType.MAPPED; } } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java index ae7a8ca389..656327862e 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java @@ -24,9 +24,11 @@ import java.text.DecimalFormat; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.JournalType; @@ -186,6 +188,16 @@ public class SyncCalculation { factory.start(); ((AIOSequentialFileFactory) factory).disableBufferReuse(); return factory; + case MAPPED: + factory = new MappedSequentialFileFactory(datafolder, new IOCriticalErrorListener() { + @Override + public void onIOException(Throwable code, String message, SequentialFile file) { + + } + }, true).chunkBytes(fileSize).overlapBytes(0).setDatasync(datasync); + + factory.start(); + return factory; default: throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(journalType); } diff --git a/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java new file mode 100644 index 0000000000..0da33c68ea --- /dev/null +++ b/artemis-journal/src/main/java/io/netty/buffer/UnpooledUnsafeDirectByteBufWrapper.java @@ -0,0 +1,371 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.netty.buffer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; + +import io.netty.util.internal.PlatformDependent; + +/** + * A NIO direct {@link ByteBuffer} wrapper. + * Only ByteBuffer's manipulation operations are supported. + * Is best suited only for encoding/decoding purposes. + */ +public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceCountedByteBuf { + + private ByteBuffer buffer; + private long memoryAddress; + + /** + * Creates a new direct buffer by wrapping the specified initial buffer. + */ + public UnpooledUnsafeDirectByteBufWrapper() { + super(0); + this.buffer = null; + this.memoryAddress = 0L; + } + + public void wrap(ByteBuffer buffer, int srcIndex, int length) { + if (buffer != null) { + this.buffer = buffer; + this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex; + clear(); + maxCapacity(length); + } else { + reset(); + } + } + + public void reset() { + this.buffer = null; + this.memoryAddress = 0L; + clear(); + maxCapacity(0); + } + + @Override + public boolean isDirect() { + return true; + } + + @Override + public int capacity() { + return maxCapacity(); + } + + @Override + public ByteBuf capacity(int newCapacity) { + if (newCapacity != maxCapacity()) { + throw new IllegalArgumentException("can't set a capacity different from the max allowed one"); + } + return this; + } + + @Override + public ByteBufAllocator alloc() { + return null; + } + + @Override + public ByteOrder order() { + return ByteOrder.BIG_ENDIAN; + } + + @Override + public boolean hasArray() { + return false; + } + + @Override + public byte[] array() { + throw new UnsupportedOperationException("direct buffer"); + } + + @Override + public int arrayOffset() { + throw new UnsupportedOperationException("direct buffer"); + } + + @Override + public boolean hasMemoryAddress() { + return true; + } + + @Override + public long memoryAddress() { + return memoryAddress; + } + + @Override + protected byte _getByte(int index) { + return UnsafeByteBufUtil.getByte(addr(index)); + } + + @Override + protected short _getShort(int index) { + return UnsafeByteBufUtil.getShort(addr(index)); + } + + @Override + protected short _getShortLE(int index) { + return UnsafeByteBufUtil.getShortLE(addr(index)); + } + + @Override + protected int _getUnsignedMedium(int index) { + return UnsafeByteBufUtil.getUnsignedMedium(addr(index)); + } + + @Override + protected int _getUnsignedMediumLE(int index) { + return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index)); + } + + @Override + protected int _getInt(int index) { + return UnsafeByteBufUtil.getInt(addr(index)); + } + + @Override + protected int _getIntLE(int index) { + return UnsafeByteBufUtil.getIntLE(addr(index)); + } + + @Override + protected long _getLong(int index) { + return UnsafeByteBufUtil.getLong(addr(index)); + } + + @Override + protected long _getLongLE(int index) { + return UnsafeByteBufUtil.getLongLE(addr(index)); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + UnsafeByteBufUtil.getBytes(this, addr(index), index, dst); + return this; + } + + @Override + public ByteBuf readBytes(ByteBuffer dst) { + int length = dst.remaining(); + checkReadableBytes(length); + getBytes(readerIndex, dst); + readerIndex += length; + return this; + } + + @Override + protected void _setByte(int index, int value) { + UnsafeByteBufUtil.setByte(addr(index), value); + } + + @Override + protected void _setShort(int index, int value) { + UnsafeByteBufUtil.setShort(addr(index), value); + } + + @Override + protected void _setShortLE(int index, int value) { + UnsafeByteBufUtil.setShortLE(addr(index), value); + } + + @Override + protected void _setMedium(int index, int value) { + UnsafeByteBufUtil.setMedium(addr(index), value); + } + + @Override + protected void _setMediumLE(int index, int value) { + UnsafeByteBufUtil.setMediumLE(addr(index), value); + } + + @Override + protected void _setInt(int index, int value) { + UnsafeByteBufUtil.setInt(addr(index), value); + } + + @Override + protected void _setIntLE(int index, int value) { + UnsafeByteBufUtil.setIntLE(addr(index), value); + } + + @Override + protected void _setLong(int index, long value) { + UnsafeByteBufUtil.setLong(addr(index), value); + } + + @Override + protected void _setLongLE(int index, long value) { + UnsafeByteBufUtil.setLongLE(addr(index), value); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + UnsafeByteBufUtil.setBytes(this, addr(index), index, src); + return this; + } + + @Override + @Deprecated + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + throw new UnsupportedOperationException("unsupported!"); + } + + @Override + @Deprecated + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + throw new UnsupportedOperationException("unsupported!"); + } + + @Override + @Deprecated + public int getBytes(int index, FileChannel out, long position, int length) throws IOException { + throw new UnsupportedOperationException("unsupported!"); + } + + @Override + @Deprecated + public int readBytes(GatheringByteChannel out, int length) throws IOException { + throw new UnsupportedOperationException("unsupported!"); + } + + @Override + @Deprecated + public int readBytes(FileChannel out, long position, int length) throws IOException { + throw new UnsupportedOperationException("unsupported!"); + } + + @Override + @Deprecated + public int setBytes(int index, InputStream in, int length) throws IOException { + throw new UnsupportedOperationException("unsupported!"); + } + + @Override + @Deprecated + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + throw new UnsupportedOperationException("unsupported!"); + } + + @Override + @Deprecated + public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + throw new UnsupportedOperationException("unsupported!"); + } + + @Override + public int nioBufferCount() { + return 1; + } + + @Override + @Deprecated + public ByteBuffer[] nioBuffers(int index, int length) { + throw new UnsupportedOperationException("unsupported!"); + } + + @Override + @Deprecated + public ByteBuf copy(int index, int length) { + + throw new UnsupportedOperationException("unsupported!"); + + } + + @Override + @Deprecated + public ByteBuffer internalNioBuffer(int index, int length) { + throw new UnsupportedOperationException("cannot access directly the wrapped buffer!"); + } + + @Override + @Deprecated + public ByteBuffer nioBuffer(int index, int length) { + throw new UnsupportedOperationException("unsupported!"); + } + + @Override + @Deprecated + protected void deallocate() { + //NO_OP + } + + @Override + public ByteBuf unwrap() { + return null; + } + + private long addr(int index) { + return memoryAddress + index; + } + + @Override + @Deprecated + protected SwappedByteBuf newSwappedByteBuf() { + throw new UnsupportedOperationException("unsupported!"); + } + + @Override + public ByteBuf setZero(int index, int length) { + UnsafeByteBufUtil.setZero(this, addr(index), index, length); + return this; + } + + @Override + public ByteBuf writeZero(int length) { + ensureWritable(length); + int wIndex = writerIndex; + setZero(wIndex, length); + writerIndex = wIndex + length; + return this; + } +} + diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java index 0aa98669a2..adfc4fef7b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java @@ -20,18 +20,19 @@ import java.io.File; import java.io.IOException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.nio.MappedByteBuffer; import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledUnsafeDirectByteBufWrapper; import io.netty.util.internal.PlatformDependent; +import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.journal.EncodingSupport; final class MappedFile implements AutoCloseable { - private static final ByteBuffer ZERO_PAGE = ByteBuffer.allocateDirect(MappedByteBufferCache.PAGE_SIZE).order(ByteOrder.nativeOrder()); - private final MappedByteBufferCache cache; - private final int zerosMaxPage; + private final UnpooledUnsafeDirectByteBufWrapper byteBufWrapper; + private final ChannelBufferWrapper channelBufferWrapper; private MappedByteBuffer lastMapped; private long lastMappedStart; private long lastMappedLimit; @@ -45,7 +46,8 @@ final class MappedFile implements AutoCloseable { this.lastMappedLimit = -1; this.position = 0; this.length = this.cache.fileSize(); - this.zerosMaxPage = Math.min(ZERO_PAGE.capacity(), (int) Math.min(Integer.MAX_VALUE, cache.overlapBytes())); + this.byteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper(); + this.channelBufferWrapper = new ChannelBufferWrapper(this.byteBufWrapper, false); } public static MappedFile of(File file, long chunckSize, long overlapSize) throws IOException { @@ -58,29 +60,33 @@ final class MappedFile implements AutoCloseable { private int checkOffset(long offset, int bytes) throws BufferUnderflowException, IOException { if (!MappedByteBufferCache.inside(offset, lastMappedStart, lastMappedLimit)) { - try { - final int index = cache.indexFor(offset); - final long mappedPosition = cache.mappedPositionFor(index); - final long mappedLimit = cache.mappedLimitFor(mappedPosition); - if (offset + bytes > mappedLimit) { - throw new IOException("mapping overflow!"); - } - lastMapped = cache.acquireMappedByteBuffer(index); - lastMappedStart = mappedPosition; - lastMappedLimit = mappedLimit; - final int bufferPosition = (int) (offset - mappedPosition); - return bufferPosition; - } catch (IllegalStateException e) { - throw new IOException(e); - } catch (IllegalArgumentException e) { - throw new BufferUnderflowException(); - } + return updateOffset(offset, bytes); } else { final int bufferPosition = (int) (offset - lastMappedStart); return bufferPosition; } } + private int updateOffset(long offset, int bytes) throws BufferUnderflowException, IOException { + try { + final int index = cache.indexFor(offset); + final long mappedPosition = cache.mappedPositionFor(index); + final long mappedLimit = cache.mappedLimitFor(mappedPosition); + if (offset + bytes > mappedLimit) { + throw new IOException("mapping overflow!"); + } + lastMapped = cache.acquireMappedByteBuffer(index); + lastMappedStart = mappedPosition; + lastMappedLimit = mappedLimit; + final int bufferPosition = (int) (offset - mappedPosition); + return bufferPosition; + } catch (IllegalStateException e) { + throw new IOException(e); + } catch (IllegalArgumentException e) { + throw new BufferUnderflowException(); + } + } + public void force() { if (lastMapped != null) { lastMapped.force(); @@ -179,6 +185,26 @@ final class MappedFile implements AutoCloseable { return read; } + /** + * Writes an encoded sequence of bytes to this file from the given buffer. + *

+ *

Bytes are written starting at this file's current position, + */ + public void write(EncodingSupport encodingSupport) throws IOException { + final int encodedSize = encodingSupport.getEncodeSize(); + final int bufferPosition = checkOffset(position, encodedSize); + this.byteBufWrapper.wrap(this.lastMapped, bufferPosition, encodedSize); + try { + encodingSupport.encode(this.channelBufferWrapper); + } finally { + this.byteBufWrapper.reset(); + } + position += encodedSize; + if (position > this.length) { + this.length = position; + } + } + /** * Writes a sequence of bytes to this file from the given buffer. *

@@ -273,21 +299,20 @@ final class MappedFile implements AutoCloseable { *

Bytes are written starting at this file's current position, */ public void zeros(long offset, int count) throws IOException { - final long targetOffset = offset + count; - final int zerosBulkCopies = count / zerosMaxPage; - final long srcAddress = PlatformDependent.directBufferAddress(ZERO_PAGE); - for (int i = 0; i < zerosBulkCopies; i++) { - final int bufferPosition = checkOffset(offset, zerosMaxPage); + while (count > 0) { + //do not need to validate the bytes count + final int bufferPosition = checkOffset(offset, 0); + final int endZerosPosition = (int)Math.min((long)bufferPosition + count, lastMapped.capacity()); + final int zeros = endZerosPosition - bufferPosition; final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition; - PlatformDependent.copyMemory(srcAddress, destAddress, zerosMaxPage); - offset += zerosMaxPage; + PlatformDependent.setMemory(destAddress, zeros, (byte) 0); + offset += zeros; + count -= zeros; + //TODO need to call force on each write? + //this.force(); } - final int remainingToBeZeroes = (int) (targetOffset - offset); - final int bufferPosition = checkOffset(offset, remainingToBeZeroes); - final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition; - PlatformDependent.copyMemory(srcAddress, destAddress, remainingToBeZeroes); - if (targetOffset > this.length) { - this.length = targetOffset; + if (offset > this.length) { + this.length = offset; } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java index a9591136ae..12e359cfc3 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java @@ -20,16 +20,13 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; @@ -44,12 +41,11 @@ final class MappedSequentialFile implements SequentialFile { private final long chunkBytes; private final long overlapBytes; private final IOCriticalErrorListener criticalErrorListener; + private final MappedSequentialFileFactory factory; private File file; private File absoluteFile; private String fileName; private MappedFile mappedFile; - private ActiveMQBuffer pooledActiveMQBuffer; - private final MappedSequentialFileFactory factory; MappedSequentialFile(MappedSequentialFileFactory factory, final File directory, @@ -65,19 +61,24 @@ final class MappedSequentialFile implements SequentialFile { this.chunkBytes = chunkBytes; this.overlapBytes = overlapBytes; this.mappedFile = null; - this.pooledActiveMQBuffer = null; this.criticalErrorListener = criticalErrorListener; } private void checkIsOpen() { if (!isOpen()) { - throw new IllegalStateException("must be open!"); + throw new IllegalStateException("File not opened!"); + } + } + + private void checkIsOpen(IOCallback callback) { + if (!isOpen()) { + callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened"); } } private void checkIsNotOpen() { if (isOpen()) { - throw new IllegalStateException("must be closed!"); + throw new IllegalStateException("File opened!"); } } @@ -101,7 +102,6 @@ final class MappedSequentialFile implements SequentialFile { @Override public void open(int maxIO, boolean useExecutor) throws IOException { //ignore maxIO e useExecutor - ActiveMQJournalLogger.LOGGER.warn("ignoring maxIO and useExecutor unsupported parameters!"); this.open(); } @@ -134,7 +134,7 @@ final class MappedSequentialFile implements SequentialFile { @Override public void delete() { - checkIsNotOpen(); + close(); if (file.exists() && !file.delete()) { ActiveMQJournalLogger.LOGGER.errorDeletingFile(this); } @@ -142,10 +142,10 @@ final class MappedSequentialFile implements SequentialFile { @Override public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws IOException { - checkIsOpen(); if (callback == null) { throw new NullPointerException("callback parameter need to be set"); } + checkIsOpen(callback); try { final ByteBuf byteBuf = bytes.byteBuf(); final int writerIndex = byteBuf.writerIndex(); @@ -182,34 +182,16 @@ final class MappedSequentialFile implements SequentialFile { } } - private ActiveMQBuffer acquiresActiveMQBufferWithAtLeast(int size) { - if (this.pooledActiveMQBuffer == null || this.pooledActiveMQBuffer.capacity() < size) { - this.pooledActiveMQBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(size, size).order(ByteOrder.nativeOrder())); - } else { - this.pooledActiveMQBuffer.clear(); - } - return pooledActiveMQBuffer; - } - @Override public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws IOException { - checkIsOpen(); if (callback == null) { throw new NullPointerException("callback parameter need to be set"); } + checkIsOpen(callback); try { - final int encodedSize = bytes.getEncodeSize(); - final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize); - bytes.encode(outBuffer); - final ByteBuf byteBuf = outBuffer.byteBuf(); - final int writerIndex = byteBuf.writerIndex(); - final int readerIndex = byteBuf.readerIndex(); - final int readableBytes = writerIndex - readerIndex; - if (readableBytes > 0) { - this.mappedFile.write(byteBuf, readerIndex, readableBytes); - if (factory.isDatasync() && sync) { - this.mappedFile.force(); - } + this.mappedFile.write(bytes); + if (factory.isDatasync() && sync) { + this.mappedFile.force(); } callback.done(); } catch (IOException e) { @@ -224,33 +206,26 @@ final class MappedSequentialFile implements SequentialFile { @Override public void write(EncodingSupport bytes, boolean sync) throws IOException { checkIsOpen(); - final int encodedSize = bytes.getEncodeSize(); - final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize); - bytes.encode(outBuffer); - final ByteBuf byteBuf = outBuffer.byteBuf(); - final int writerIndex = byteBuf.writerIndex(); - final int readerIndex = byteBuf.readerIndex(); - final int readableBytes = writerIndex - readerIndex; - if (readableBytes > 0) { - this.mappedFile.write(byteBuf, readerIndex, readableBytes); - if (factory.isDatasync() && sync) { - this.mappedFile.force(); - } + this.mappedFile.write(bytes); + if (factory.isDatasync() && sync) { + this.mappedFile.force(); } } @Override public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) { - checkIsOpen(); if (callback == null) { throw new NullPointerException("callback parameter need to be set"); } + checkIsOpen(callback); try { final int position = bytes.position(); final int limit = bytes.limit(); final int remaining = limit - position; if (remaining > 0) { this.mappedFile.write(bytes, position, remaining); + final int newPosition = position + remaining; + bytes.position(newPosition); if (factory.isDatasync() && sync) { this.mappedFile.force(); } @@ -273,6 +248,8 @@ final class MappedSequentialFile implements SequentialFile { final int remaining = limit - position; if (remaining > 0) { this.mappedFile.write(bytes, position, remaining); + final int newPosition = position + remaining; + bytes.position(newPosition); if (factory.isDatasync() && sync) { this.mappedFile.force(); } @@ -281,10 +258,10 @@ final class MappedSequentialFile implements SequentialFile { @Override public int read(ByteBuffer bytes, IOCallback callback) throws IOException { - checkIsOpen(); if (callback == null) { throw new NullPointerException("callback parameter need to be set"); } + checkIsOpen(callback); try { final int position = bytes.position(); final int limit = bytes.limit(); @@ -296,8 +273,10 @@ final class MappedSequentialFile implements SequentialFile { bytes.flip(); callback.done(); return bytesRead; + } else { + callback.done(); + return 0; } - return 0; } catch (IOException e) { if (this.criticalErrorListener != null) { this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); @@ -360,7 +339,14 @@ final class MappedSequentialFile implements SequentialFile { @Override public void renameTo(String newFileName) throws Exception { - checkIsNotOpen(); + try { + close(); + } catch (Exception e) { + if (e instanceof IOException) { + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); + } + throw e; + } if (this.fileName == null) { this.fileName = this.file.getName(); } @@ -388,14 +374,10 @@ final class MappedSequentialFile implements SequentialFile { if (dstFile.isOpen()) { throw new IllegalArgumentException("dstFile must be closed too"); } - try (RandomAccessFile src = new RandomAccessFile(file, "rw"); - FileChannel srcChannel = src.getChannel(); - FileLock srcLock = srcChannel.lock()) { + try (RandomAccessFile src = new RandomAccessFile(file, "rw"); FileChannel srcChannel = src.getChannel(); FileLock srcLock = srcChannel.lock()) { final long readableBytes = srcChannel.size(); if (readableBytes > 0) { - try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw"); - FileChannel dstChannel = dst.getChannel(); - FileLock dstLock = dstChannel.lock()) { + try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw"); FileChannel dstChannel = dst.getChannel(); FileLock dstLock = dstChannel.lock()) { final long oldLength = dst.length(); final long newLength = oldLength + readableBytes; dst.setLength(newLength); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java index 55bb2bf72c..c4b7d30c3d 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java @@ -20,7 +20,6 @@ import java.io.File; import java.io.FilenameFilter; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -29,30 +28,42 @@ import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; public final class MappedSequentialFileFactory implements SequentialFileFactory { private static long DEFAULT_BLOCK_SIZE = 64L << 20; private final File directory; private final IOCriticalErrorListener criticalErrorListener; + private final TimedBuffer timedBuffer; private long chunkBytes; private long overlapBytes; private boolean useDataSync; + private boolean supportCallbacks; - public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) { + protected volatile int alignment = -1; + + public MappedSequentialFileFactory(File directory, + IOCriticalErrorListener criticalErrorListener, + boolean supportCallbacks) { this.directory = directory; this.criticalErrorListener = criticalErrorListener; this.chunkBytes = DEFAULT_BLOCK_SIZE; this.overlapBytes = DEFAULT_BLOCK_SIZE / 4; + this.useDataSync = true; + this.timedBuffer = null; + this.supportCallbacks = supportCallbacks; + } + + public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) { + this(directory, criticalErrorListener, false); } public MappedSequentialFileFactory(File directory) { - this.directory = directory; - this.criticalErrorListener = null; - this.chunkBytes = DEFAULT_BLOCK_SIZE; - this.overlapBytes = DEFAULT_BLOCK_SIZE / 4; + this(directory, null); } + public long chunkBytes() { return chunkBytes; } @@ -73,7 +84,12 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory @Override public SequentialFile createSequentialFile(String fileName) { - return new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener); + final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener); + if (this.timedBuffer == null) { + return mappedSequentialFile; + } else { + return new TimedSequentialFile(this, mappedSequentialFile); + } } @Override @@ -89,17 +105,12 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory @Override public int getMaxIO() { - return 0; + return 1; } @Override public List listFiles(final String extension) throws Exception { - final FilenameFilter extensionFilter = new FilenameFilter() { - @Override - public boolean accept(final File file, final String name) { - return name.endsWith("." + extension); - } - }; + final FilenameFilter extensionFilter = (file, name) -> name.endsWith("." + extension); final String[] fileNames = directory.list(extensionFilter); if (fileNames == null) { return Collections.EMPTY_LIST; @@ -109,7 +120,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory @Override public boolean isSupportsCallbacks() { - return false; + return this.supportCallbacks; } @Override @@ -121,7 +132,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory @Override public ByteBuffer allocateDirectBuffer(final int size) { - return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder()); + return ByteBuffer.allocateDirect(size); } @Override @@ -131,7 +142,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory @Override public ByteBuffer newBuffer(final int size) { - return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder()); + return ByteBuffer.allocate(size); } @Override @@ -143,17 +154,23 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory @Override public void activateBuffer(SequentialFile file) { - + if (timedBuffer != null) { + file.setTimedBuffer(timedBuffer); + } } @Override public void deactivateBuffer() { - + if (timedBuffer != null) { + // When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer + timedBuffer.flush(); + timedBuffer.setObserver(null); + } } @Override public ByteBuffer wrapBuffer(final byte[] bytes) { - return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder()); + return ByteBuffer.wrap(bytes); } @Override @@ -162,8 +179,8 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory } @Override - public SequentialFileFactory setAlignment(int alignment) { - // no op + public MappedSequentialFileFactory setAlignment(int alignment) { + this.alignment = alignment; return this; } @@ -179,7 +196,6 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory @Override public void clearBuffer(final ByteBuffer buffer) { - buffer.clear(); if (buffer.isDirect()) { BytesUtils.zerosDirect(buffer); } else if (buffer.hasArray()) { @@ -193,16 +209,21 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory buffer.put(i, (byte) 0); } } + buffer.rewind(); } @Override public void start() { - + if (timedBuffer != null) { + timedBuffer.start(); + } } @Override public void stop() { - + if (timedBuffer != null) { + timedBuffer.stop(); + } } @Override @@ -215,6 +236,8 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory @Override public void flush() { - + if (timedBuffer != null) { + timedBuffer.flush(); + } } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java new file mode 100644 index 0000000000..d376d7df65 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.io.mapped; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.core.io.DummyCallback; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; +import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; + +final class TimedSequentialFile implements SequentialFile { + + private final SequentialFileFactory factory; + private final SequentialFile sequentialFile; + private final LocalBufferObserver observer; + private final ThreadLocal callbackPool; + private TimedBuffer timedBuffer; + + TimedSequentialFile(SequentialFileFactory factory, SequentialFile sequentialFile) { + this.sequentialFile = sequentialFile; + this.factory = factory; + this.observer = new LocalBufferObserver(); + this.callbackPool = ThreadLocal.withInitial(ResettableIOCallback::new); + } + + @Override + public boolean isOpen() { + return this.sequentialFile.isOpen(); + } + + @Override + public boolean exists() { + return this.sequentialFile.exists(); + } + + @Override + public void open() throws Exception { + this.sequentialFile.open(); + } + + @Override + public void open(int maxIO, boolean useExecutor) throws Exception { + this.sequentialFile.open(maxIO, useExecutor); + } + + @Override + public boolean fits(int size) { + if (timedBuffer == null) { + return this.sequentialFile.fits(size); + } else { + return timedBuffer.checkSize(size); + } + } + + @Override + public int calculateBlockStart(int position) throws Exception { + return this.sequentialFile.calculateBlockStart(position); + } + + @Override + public String getFileName() { + return this.sequentialFile.getFileName(); + } + + @Override + public void fill(int size) throws Exception { + this.sequentialFile.fill(size); + } + + @Override + public void delete() throws IOException, InterruptedException, ActiveMQException { + this.sequentialFile.delete(); + } + + @Override + public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception { + if (this.timedBuffer != null) { + this.timedBuffer.addBytes(bytes, sync, callback); + } else { + this.sequentialFile.write(bytes, sync, callback); + } + } + + @Override + public void write(ActiveMQBuffer bytes, boolean sync) throws Exception { + if (sync) { + if (this.timedBuffer != null) { + final ResettableIOCallback callback = callbackPool.get(); + try { + this.timedBuffer.addBytes(bytes, true, callback); + callback.waitCompletion(); + } finally { + callback.reset(); + } + } else { + this.sequentialFile.write(bytes, true); + } + } else { + if (this.timedBuffer != null) { + this.timedBuffer.addBytes(bytes, false, DummyCallback.getInstance()); + } else { + this.sequentialFile.write(bytes, false); + } + } + } + + @Override + public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception { + if (this.timedBuffer != null) { + this.timedBuffer.addBytes(bytes, sync, callback); + } else { + this.sequentialFile.write(bytes, sync, callback); + } + } + + @Override + public void write(EncodingSupport bytes, boolean sync) throws Exception { + if (sync) { + if (this.timedBuffer != null) { + final ResettableIOCallback callback = callbackPool.get(); + try { + this.timedBuffer.addBytes(bytes, true, callback); + callback.waitCompletion(); + } finally { + callback.reset(); + } + } else { + this.sequentialFile.write(bytes, true); + } + } else { + if (this.timedBuffer != null) { + this.timedBuffer.addBytes(bytes, false, DummyCallback.getInstance()); + } else { + this.sequentialFile.write(bytes, false); + } + } + } + + @Override + public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) { + this.sequentialFile.writeDirect(bytes, sync, callback); + } + + @Override + public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception { + this.sequentialFile.writeDirect(bytes, sync); + } + + @Override + public int read(ByteBuffer bytes, IOCallback callback) throws Exception { + return this.sequentialFile.read(bytes, callback); + } + + @Override + public int read(ByteBuffer bytes) throws Exception { + return this.sequentialFile.read(bytes); + } + + @Override + public void position(long pos) throws IOException { + this.sequentialFile.position(pos); + } + + @Override + public long position() { + return this.sequentialFile.position(); + } + + @Override + public void close() throws Exception { + this.sequentialFile.close(); + } + + @Override + public void sync() throws IOException { + this.sequentialFile.sync(); + } + + @Override + public long size() throws Exception { + return this.sequentialFile.size(); + } + + @Override + public void renameTo(String newFileName) throws Exception { + this.sequentialFile.renameTo(newFileName); + } + + @Override + public SequentialFile cloneFile() { + return new TimedSequentialFile(factory, this.sequentialFile.cloneFile()); + } + + @Override + public void copyTo(SequentialFile newFileName) throws Exception { + this.sequentialFile.copyTo(newFileName); + } + + @Override + public void setTimedBuffer(TimedBuffer buffer) { + if (this.timedBuffer != null) { + this.timedBuffer.setObserver(null); + } + this.timedBuffer = buffer; + if (buffer != null) { + buffer.setObserver(this.observer); + } + } + + @Override + public File getJavaFile() { + return this.sequentialFile.getJavaFile(); + } + + private static final class ResettableIOCallback implements IOCallback { + + private final CyclicBarrier cyclicBarrier; + private int errorCode; + private String errorMessage; + + ResettableIOCallback() { + this.cyclicBarrier = new CyclicBarrier(2); + } + + public void waitCompletion() throws InterruptedException, ActiveMQException, BrokenBarrierException { + this.cyclicBarrier.await(); + if (this.errorMessage != null) { + throw ActiveMQExceptionType.createException(this.errorCode, this.errorMessage); + } + } + + public void reset() { + this.errorCode = 0; + this.errorMessage = null; + } + + @Override + public void done() { + try { + this.cyclicBarrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + + @Override + public void onError(int errorCode, String errorMessage) { + try { + this.errorCode = errorCode; + this.errorMessage = errorMessage; + this.cyclicBarrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + } + + private static final class DelegateCallback implements IOCallback { + + final List delegates; + + private DelegateCallback() { + this.delegates = new ArrayList<>(); + } + + public List delegates() { + return this.delegates; + } + + @Override + public void done() { + final int size = delegates.size(); + for (int i = 0; i < size; i++) { + try { + final IOCallback callback = delegates.get(i); + callback.done(); + } catch (Throwable e) { + ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e); + } + } + } + + @Override + public void onError(final int errorCode, final String errorMessage) { + for (IOCallback callback : delegates) { + try { + callback.onError(errorCode, errorMessage); + } catch (Throwable e) { + ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e); + } + } + } + } + + private final class LocalBufferObserver implements TimedBufferObserver { + + private final ThreadLocal callbacksPool = ThreadLocal.withInitial(DelegateCallback::new); + + @Override + public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List callbacks) { + buffer.flip(); + if (buffer.limit() == 0) { + //if there are no bytes to flush, can release the callbacks + final int size = callbacks.size(); + for (int i = 0; i < size; i++) { + callbacks.get(i).done(); + } + } else { + final DelegateCallback delegateCallback = callbacksPool.get(); + final int size = callbacks.size(); + final List delegates = delegateCallback.delegates(); + for (int i = 0; i < size; i++) { + delegates.add(callbacks.get(i)); + } + try { + sequentialFile.writeDirect(buffer, requestedSync, delegateCallback); + } finally { + delegates.clear(); + } + } + } + + @Override + public ByteBuffer newBuffer(final int size, final int limit) { + final int alignedSize = factory.calculateBlockSize(size); + final int alignedLimit = factory.calculateBlockSize(limit); + final ByteBuffer buffer = factory.newBuffer(alignedSize); + buffer.limit(alignedLimit); + return buffer; + } + + @Override + public int getRemainingBytes() { + try { + final int remaining = (int) Math.min(sequentialFile.size() - sequentialFile.position(), Integer.MAX_VALUE); + return remaining; + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + @Override + public String toString() { + return "TimedBufferObserver on file (" + getFileName() + ")"; + } + + } +} diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java new file mode 100644 index 0000000000..b426219551 --- /dev/null +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.io; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.stream.Stream; + +import io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue; +import org.apache.activemq.artemis.ArtemisConstants; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; +import org.apache.activemq.artemis.jlibaio.LibaioContext; + +/** + * To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM + */ +public class JournalTptBenchmark { + + public static void main(String[] args) throws Exception { + final boolean useDefaultIoExecutor = true; + final int fileSize = 1024 * 1024; + final boolean dataSync = true; + final Type type = Type.Mapped; + final int tests = 5; + final int warmup = 20_000; + final int measurements = 20_000; + final int msgSize = 100; + final byte[] msgContent = new byte[msgSize]; + Arrays.fill(msgContent, (byte) 1); + final int totalMessages = (measurements * tests + warmup); + final File tmpDirectory = new File("./"); + //using the default configuration when the broker starts! + final SequentialFileFactory factory; + switch (type) { + + case Mapped: + final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true); + factory = mappedFactory.chunkBytes(fileSize).overlapBytes(0).setDatasync(dataSync); + break; + case Nio: + factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync); + break; + case Aio: + factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync); + //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse(); + if (!LibaioContext.isLoaded()) { + throw new IllegalStateException("lib AIO not loaded!"); + } + break; + default: + throw new AssertionError("unsupported case"); + } + + int numFiles = (int) (totalMessages * factory.calculateBlockSize(msgSize)) / fileSize; + if (numFiles < 2) { + numFiles = 2; + } + ExecutorService service = null; + final Journal journal; + if (useDefaultIoExecutor) { + journal = new JournalImpl(fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO()); + journal.start(); + } else { + final ArrayList> tasks = new ArrayList<>(); + service = Executors.newSingleThreadExecutor(); + journal = new JournalImpl(() -> new Executor() { + + private final MpscArrayQueue taskQueue = new MpscArrayQueue<>(1024); + + { + tasks.add(taskQueue); + } + + @Override + public void execute(Runnable command) { + while (!taskQueue.offer(command)) { + LockSupport.parkNanos(1L); + } + } + }, fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO(), 0); + journal.start(); + service.execute(() -> { + final int size = tasks.size(); + final int capacity = 1024; + while (!Thread.currentThread().isInterrupted()) { + for (int i = 0; i < size; i++) { + final MpscArrayQueue runnables = tasks.get(i); + for (int j = 0; j < capacity; j++) { + final Runnable task = runnables.poll(); + if (task == null) { + break; + } + try { + task.run(); + } catch (Throwable t) { + System.err.println(t); + } + } + } + } + + }); + } + try { + journal.load(new ArrayList(), null, null); + } catch (Exception e) { + throw new RuntimeException(e); + } + try { + final EncodingSupport encodingSupport = new EncodingSupport() { + @Override + public int getEncodeSize() { + return msgSize; + } + + @Override + public void encode(ActiveMQBuffer buffer) { + final int writerIndex = buffer.writerIndex(); + buffer.setBytes(writerIndex, msgContent); + buffer.writerIndex(writerIndex + msgSize); + } + + @Override + public void decode(ActiveMQBuffer buffer) { + + } + }; + long id = 1; + { + final long elapsed = writeMeasurements(id, journal, encodingSupport, warmup); + id += warmup; + System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec"); + } + for (int t = 0; t < tests; t++) { + final long elapsed = writeMeasurements(id, journal, encodingSupport, measurements); + System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec"); + id += warmup; + } + + } finally { + journal.stop(); + if (service != null) { + service.shutdown(); + } + final File[] fileToDeletes = tmpDirectory.listFiles(); + System.out.println("Files to deletes" + Arrays.toString(fileToDeletes)); + Stream.of(fileToDeletes).forEach(File::delete); + } + } + + private static long writeMeasurements(long id, + Journal journal, + EncodingSupport encodingSupport, + int measurements) throws Exception { + System.gc(); + TimeUnit.SECONDS.sleep(2); + + final long start = System.nanoTime(); + for (int i = 0; i < measurements; i++) { + write(id, journal, encodingSupport); + id++; + } + final long elapsed = System.nanoTime() - start; + return elapsed; + } + + private static void write(long id, Journal journal, EncodingSupport encodingSupport) throws Exception { + journal.appendAddRecord(id, (byte) 1, encodingSupport, false); + final SimpleWaitIOCallback ioCallback = new SimpleWaitIOCallback(); + journal.appendUpdateRecord(id, (byte) 1, encodingSupport, true, ioCallback); + ioCallback.waitCompletion(); + } + + private enum Type { + + Mapped, Nio, Aio + + } +} diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java new file mode 100644 index 0000000000..7756a064cd --- /dev/null +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.io; + +import java.io.File; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; + +import org.apache.activemq.artemis.ArtemisConstants; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.jlibaio.LibaioContext; + +/** + * To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM + */ +public class SequentialFileTptBenchmark { + + private static final FastWaitIOCallback CALLBACK = new FastWaitIOCallback(); + + public static void main(String[] args) throws Exception { + final boolean dataSync = true; + final boolean writeSync = true; + final Type type = Type.Mapped; + final int tests = 10; + final int warmup = 20_000; + final int measurements = 20_000; + final int msgSize = 100; + final byte[] msgContent = new byte[msgSize]; + Arrays.fill(msgContent, (byte) 1); + final File tmpDirectory = new File("./"); + //using the default configuration when the broker starts! + final SequentialFileFactory factory; + switch (type) { + + case Mapped: + final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true); + final int alignedMessageSize = mappedFactory.calculateBlockSize(msgSize); + final int totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup); + factory = mappedFactory.chunkBytes(totalFileSize).overlapBytes(0).setDatasync(dataSync); + break; + case Nio: + factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync); + break; + case Aio: + factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync); + //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse(); + if (!LibaioContext.isLoaded()) { + throw new IllegalStateException("lib AIO not loaded!"); + } + break; + default: + throw new AssertionError("unsupported case"); + } + factory.start(); + try { + final EncodingSupport encodingSupport = new EncodingSupport() { + @Override + public int getEncodeSize() { + return msgSize; + } + + @Override + public void encode(ActiveMQBuffer buffer) { + final int writerIndex = buffer.writerIndex(); + buffer.setBytes(writerIndex, msgContent); + buffer.writerIndex(writerIndex + msgSize); + } + + @Override + public void decode(ActiveMQBuffer buffer) { + + } + }; + final int alignedMessageSize = factory.calculateBlockSize(msgSize); + final long totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup); + if (totalFileSize > Integer.MAX_VALUE) + throw new IllegalArgumentException("reduce measurements/warmup"); + final int fileSize = (int) totalFileSize; + final SequentialFile sequentialFile = factory.createSequentialFile("seq.dat"); + sequentialFile.getJavaFile().delete(); + sequentialFile.getJavaFile().deleteOnExit(); + sequentialFile.open(); + final long startZeros = System.nanoTime(); + sequentialFile.fill(fileSize); + final long elapsedZeros = System.nanoTime() - startZeros; + System.out.println("Zeroed " + fileSize + " bytes in " + TimeUnit.NANOSECONDS.toMicros(elapsedZeros) + " us"); + try { + { + final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, warmup, writeSync); + System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec"); + } + for (int t = 0; t < tests; t++) { + final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, measurements, writeSync); + System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec"); + } + } finally { + sequentialFile.close(); + } + } finally { + factory.stop(); + } + } + + private static long writeMeasurements(SequentialFileFactory sequentialFileFactory, + SequentialFile sequentialFile, + EncodingSupport encodingSupport, + int measurements, + boolean writeSync) throws Exception { + //System.gc(); + TimeUnit.SECONDS.sleep(2); + sequentialFileFactory.activateBuffer(sequentialFile); + sequentialFile.position(0); + final long start = System.nanoTime(); + for (int i = 0; i < measurements; i++) { + write(sequentialFile, encodingSupport, writeSync); + } + sequentialFileFactory.deactivateBuffer(); + final long elapsed = System.nanoTime() - start; + return elapsed; + } + + private static void write(SequentialFile sequentialFile, + EncodingSupport encodingSupport, + boolean sync) throws Exception { + //this pattern is necessary to ensure that NIO's TimedBuffer fill flush the buffer and know the real size of it + if (sequentialFile.fits(encodingSupport.getEncodeSize())) { + final FastWaitIOCallback ioCallback = CALLBACK.reset(); + sequentialFile.write(encodingSupport, sync, ioCallback); + ioCallback.waitCompletion(); + } else { + throw new IllegalStateException("can't happen!"); + } + } + + private enum Type { + + Mapped, Nio, Aio + + } + + private static final class FastWaitIOCallback implements IOCallback { + + private final AtomicBoolean done = new AtomicBoolean(false); + private int errorCode = 0; + private String errorMessage = null; + + public FastWaitIOCallback reset() { + errorCode = 0; + errorMessage = null; + done.lazySet(false); + return this; + } + + @Override + public void done() { + errorCode = 0; + errorMessage = null; + done.lazySet(true); + } + + @Override + public void onError(int errorCode, String errorMessage) { + this.errorCode = errorCode; + this.errorMessage = errorMessage; + done.lazySet(true); + } + + public void waitCompletion() throws InterruptedException, ActiveMQException { + final Thread currentThread = Thread.currentThread(); + while (!done.get()) { + LockSupport.parkNanos(1L); + if (currentThread.isInterrupted()) + throw new InterruptedException(); + } + if (errorMessage != null) { + throw ActiveMQExceptionType.createException(errorCode, errorMessage); + } + } + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 7c0a6510db..51fd6cc1f5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.impl.JournalFile; @@ -136,6 +137,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager { ActiveMQServerLogger.LOGGER.journalUseAIO(); journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener); break; + case MAPPED: + ActiveMQServerLogger.LOGGER.journalUseMAPPED(); + //the mapped version do not need buffering by default + journalFF = new MappedSequentialFileFactory(config.getJournalLocation(), criticalErrorListener, true).chunkBytes(config.getJournalFileSize()).overlapBytes(0); + break; default: throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType()); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index c365b7d326..7d5822c28a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1558,8 +1558,12 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format = Message.Format.MESSAGE_FORMAT) void invalidMessageCounterPeriod(long value); + @LogMessage(level = Logger.Level.INFO) + @Message(id = 224073, value = "Using MAPPED Journal", format = Message.Format.MESSAGE_FORMAT) + void journalUseMAPPED(); + @LogMessage(level = Logger.Level.ERROR) - @Message(id = 224073, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 224074, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT) void failedToPurgeQueue(@Cause Exception e, SimpleString bindingName); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java index 2716a384e3..df60e9beef 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/JournalType.java @@ -40,6 +40,7 @@ public enum JournalType { switch (type) { case "NIO": return NIO; case "ASYNCIO" : return ASYNCIO; + case "MAPPED" : return MAPPED; default: throw new IllegalStateException("Invalid JournalType:" + type + " valid Types: " + validValues); } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 66739fed70..bc9363f252 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -584,6 +584,7 @@ + diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd index e538ff0596..2676e19c3b 100644 --- a/artemis-tools/src/test/resources/artemis-configuration.xsd +++ b/artemis-tools/src/test/resources/artemis-configuration.xsd @@ -576,6 +576,7 @@ + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOJournalImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOJournalImplTest.java index a220ab659b..b0d19b3a46 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOJournalImplTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOJournalImplTest.java @@ -59,7 +59,8 @@ public class AIOJournalImplTest extends JournalImplTestUnit { file.mkdir(); - return new AIOSequentialFileFactory(getTestDirfile(), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 1000000, 10, false); + // forcing the alignment to be 512, as this test was hard coded around this size. + return new AIOSequentialFileFactory(getTestDirfile(), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 1000000, 10, false).setAlignment(512); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java new file mode 100644 index 0000000000..5d540601c6 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedImportExportTest.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.journal; + +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; + +public class MappedImportExportTest extends NIOImportExportTest { + + @Override + protected SequentialFileFactory getFileFactory() throws Exception { + return new MappedSequentialFileFactory(getTestDirfile()); + } +} + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java new file mode 100644 index 0000000000..32b4b8f96c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalCompactTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.journal; + +import java.io.File; + +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; + +public class MappedJournalCompactTest extends NIOJournalCompactTest { + + @Override + protected SequentialFileFactory getFileFactory() throws Exception { + File file = new File(getTestDir()); + + ActiveMQTestBase.deleteDirectory(file); + + file.mkdir(); + + return new MappedSequentialFileFactory(getTestDirfile()); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java new file mode 100644 index 0000000000..940c8a69d8 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedJournalImplTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.journal; + +import java.io.File; + +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; +import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit; + +public class MappedJournalImplTest extends JournalImplTestUnit { + + @Override + protected SequentialFileFactory getFileFactory() throws Exception { + File file = new File(getTestDir()); + + deleteDirectory(file); + + file.mkdir(); + + return new MappedSequentialFileFactory(getTestDirfile()); + } + + @Override + protected int getAlignment() { + return fileFactory.getAlignment(); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java new file mode 100644 index 0000000000..cf87cdede9 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.journal; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase; +import org.junit.Assert; +import org.junit.Test; + +public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBase { + + @Override + protected SequentialFileFactory createFactory(String folder) { + return new MappedSequentialFileFactory(new File(folder)); + } + + @Test + public void testInterrupts() throws Throwable { + + final EncodingSupport fakeEncoding = new EncodingSupport() { + @Override + public int getEncodeSize() { + return 10; + } + + @Override + public void encode(ActiveMQBuffer buffer) { + buffer.writeBytes(new byte[10]); + } + + @Override + public void decode(ActiveMQBuffer buffer) { + + } + }; + + final AtomicInteger calls = new AtomicInteger(0); + final MappedSequentialFileFactory factory = new MappedSequentialFileFactory(new File(getTestDir()), (code, message, file) -> { + new Exception("shutdown").printStackTrace(); + calls.incrementAndGet(); + }); + + Thread threadOpen = new Thread() { + @Override + public void run() { + try { + Thread.currentThread().interrupt(); + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadOpen.start(); + threadOpen.join(); + + Thread threadClose = new Thread() { + @Override + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + file.write(fakeEncoding, true); + Thread.currentThread().interrupt(); + file.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadClose.start(); + threadClose.join(); + + Thread threadWrite = new Thread() { + @Override + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + Thread.currentThread().interrupt(); + file.write(fakeEncoding, true); + file.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadWrite.start(); + threadWrite.join(); + + Thread threadFill = new Thread() { + @Override + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + Thread.currentThread().interrupt(); + file.fill(1024); + file.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadFill.start(); + threadFill.join(); + + Thread threadWriteDirect = new Thread() { + @Override + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + ByteBuffer buffer = ByteBuffer.allocate(10); + buffer.put(new byte[10]); + Thread.currentThread().interrupt(); + file.writeDirect(buffer, true); + file.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadWriteDirect.start(); + threadWriteDirect.join(); + + Thread threadRead = new Thread() { + @Override + public void run() { + try { + SequentialFile file = factory.createSequentialFile("file.txt"); + file.open(); + file.write(fakeEncoding, true); + file.position(0); + ByteBuffer readBytes = ByteBuffer.allocate(fakeEncoding.getEncodeSize()); + Thread.currentThread().interrupt(); + file.read(readBytes); + file.close(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threadRead.start(); + threadRead.join(); + + // An interrupt exception shouldn't issue a shutdown + Assert.assertEquals(0, calls.get()); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java index 8f15c4854a..d2ffd6fac3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/ValidateTransactionHealthTest.java @@ -22,8 +22,11 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.ArtemisConstants; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; @@ -102,6 +105,27 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase { internalTest("nio2", getTestDir(), 10000, 0, true, true, 1); } + @Test + public void testMMap() throws Exception { + internalTest("mmap", getTestDir(), 10000, 100, true, true, 1); + } + + @Test + public void testMMAPHugeTransaction() throws Exception { + internalTest("mmap", getTestDir(), 10000, 10000, true, true, 1); + } + + @Test + public void testMMAPOMultiThread() throws Exception { + internalTest("mmap", getTestDir(), 1000, 100, true, true, 10); + } + + @Test + public void testMMAPNonTransactional() throws Exception { + internalTest("mmap", getTestDir(), 10000, 0, true, true, 1); + } + + // Package protected --------------------------------------------- private void internalTest(final String type, @@ -234,7 +258,7 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase { if (args.length != 5) { System.err.println("Use: java -cp " + ValidateTransactionHealthTest.class.getCanonicalName() + - " aio|nio "); + " aio|nio|mmap "); System.exit(-1); } System.out.println("Running"); @@ -320,15 +344,22 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase { } public static JournalImpl createJournal(final String journalType, final String journalDir) { - JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir), "journaltst", "tst", 500); + JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir, 10485760), "journaltst", "tst", 500); return journal; } - public static SequentialFileFactory getFactory(final String factoryType, final String directory) { + public static SequentialFileFactory getFactory(final String factoryType, final String directory, int fileSize) { if (factoryType.equals("aio")) { return new AIOSequentialFileFactory(new File(directory), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 10, false); } else if (factoryType.equals("nio2")) { return new NIOSequentialFileFactory(new File(directory), true, 1); + } else if (factoryType.equals("mmap")) { + return new MappedSequentialFileFactory(new File(directory), new IOCriticalErrorListener() { + @Override + public void onIOException(Throwable code, String message, SequentialFile file) { + code.printStackTrace(); + } + }, true).chunkBytes(fileSize).overlapBytes(0); } else { return new NIOSequentialFileFactory(new File(directory), false, 1); } From c039aae37fbc3b0eb8fe0b0289fb2af2def84f6a Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 2 Feb 2017 17:16:54 -0500 Subject: [PATCH 4/4] ARTEMIS-906 Adding Paging tests for mapped journal --- .../tests/integration/paging/GlobalPagingTest.java | 4 ++-- .../artemis/tests/integration/paging/PagingTest.java | 12 +++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java index 3960b49667..84b48eeccc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java @@ -43,8 +43,8 @@ import org.junit.runners.Parameterized; @RunWith(Parameterized.class) public class GlobalPagingTest extends PagingTest { - public GlobalPagingTest(StoreConfiguration.StoreType storeType) { - super(storeType); + public GlobalPagingTest(StoreConfiguration.StoreType storeType, boolean mapped) { + super(storeType, mapped); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 3a932b9c2c..1843239319 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -73,6 +73,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; @@ -109,17 +110,20 @@ public class PagingTest extends ActiveMQTestBase { protected static final int PAGE_SIZE = 10 * 1024; + protected final boolean mapped; + protected final StoreConfiguration.StoreType storeType; static final SimpleString ADDRESS = new SimpleString("SimpleAddress"); - public PagingTest(StoreConfiguration.StoreType storeType) { + public PagingTest(StoreConfiguration.StoreType storeType, boolean mapped) { this.storeType = storeType; + this.mapped = mapped; } - @Parameterized.Parameters(name = "storeType={0}") + @Parameterized.Parameters(name = "storeType={0}, mapped={1}") public static Collection data() { - Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}}; + Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE, false}, {StoreConfiguration.StoreType.FILE, true}, {StoreConfiguration.StoreType.DATABASE, false}}; return Arrays.asList(params); } @@ -5654,6 +5658,8 @@ public class PagingTest extends ActiveMQTestBase { Configuration configuration = super.createDefaultInVMConfig().setJournalSyncNonTransactional(false); if (storeType == StoreConfiguration.StoreType.DATABASE) { setDBStoreType(configuration); + } else if (mapped) { + configuration.setJournalType(JournalType.MAPPED); } return configuration; }