From 484e9396981a5f56ef8a79ddda826f07a46a78fc Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 9 Oct 2017 10:14:30 -0400 Subject: [PATCH 1/3] NO-JIRA: Speed up cleanupThreads for testsuite --- .../activemq/artemis/api/core/client/ActiveMQClient.java | 4 ++-- .../artemis/core/remoting/impl/invm/InVMConnector.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java index caa2a39885..cb10768032 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java @@ -163,7 +163,7 @@ public final class ActiveMQClient { } if (globalThreadPool != null) { - globalThreadPool.shutdown(); + globalThreadPool.shutdownNow(); try { if (!globalThreadPool.awaitTermination(time, unit)) { globalThreadPool.shutdownNow(); @@ -177,7 +177,7 @@ public final class ActiveMQClient { } if (globalScheduledThreadPool != null) { - globalScheduledThreadPool.shutdown(); + globalScheduledThreadPool.shutdownNow(); try { if (!globalScheduledThreadPool.awaitTermination(time, unit)) { globalScheduledThreadPool.shutdownNow(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java index d3ac0fdfeb..c84020c764 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java @@ -101,7 +101,7 @@ public class InVMConnector extends AbstractConnector { public static synchronized void resetThreadPool() { if (threadPoolExecutor != null) { - threadPoolExecutor.shutdown(); + threadPoolExecutor.shutdownNow(); threadPoolExecutor = null; } } From ba1323c8b28535c857c4373e4d63dcdd5cfa0a95 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 9 Oct 2017 13:36:22 -0400 Subject: [PATCH 2/3] ARTEMIS-1455 Fixing issues on Large Message conversion --- .../activemq/artemis/core/journal/impl/JournalImpl.java | 6 +++++- .../activemq/artemis/core/persistence/StorageManager.java | 5 +++++ .../impl/journal/AbstractJournalStorageManager.java | 6 ++++++ .../artemis/core/server/impl/ServerSessionImpl.java | 2 +- 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 3043b9770e..5f31a2b216 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -2207,7 +2207,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal */ @Override public long getMaxRecordSize() { - return Math.min(getFileSize(), fileFactory.getBufferSize()); + if (fileFactory.getBufferSize() == 0) { + return getFileSize(); + } else { + return Math.min(getFileSize(), fileFactory.getBufferSize()); + } } private void flushExecutor(Executor executor) throws InterruptedException { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index ba32252990..6dc45c087e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -65,6 +65,11 @@ import org.apache.activemq.artemis.utils.IDGenerator; */ public interface StorageManager extends IDGenerator, ActiveMQComponent { + default long getMaxRecordSize() { + /** Null journal is pretty much memory */ + return Long.MAX_VALUE; + } + void criticalError(Throwable error); /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 1c5a4c3c5d..3930334b78 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -228,6 +228,12 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, this); } + + public long getMaxRecordSize() { + return messageJournal.getMaxRecordSize(); + } + + /** * Called during initialization. Used by implementations to setup Journals, Stores etc... * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 781334192c..679312c4b9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1333,7 +1333,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { boolean noAutoCreateQueue) throws Exception { final Message message; - if ((msg.getEncodeSize() > storageManager.getMessageJournal().getMaxRecordSize()) && !msg.isLargeMessage()) { + if ((msg.getEncodeSize() > storageManager.getMaxRecordSize()) && !msg.isLargeMessage()) { message = messageToLargeMessage(msg); } else { message = msg; From d190b611be13f6d330fc743ac2b8ed28bf097c60 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 4 Oct 2017 21:20:03 -0400 Subject: [PATCH 3/3] ARTEMIS-1452 Improvements to IO parameters and options - it is now possible to disable the TimedBuffer - this is increasing the default on libaio maxAIO to 4k - The Auto Tuning on the journal will use asynchronous writes to simulate what would happen on faster disks - If you set datasync=false on the CLI, the system will suggest mapped and disable the buffer timeout This closes #1436 This commit superseeds #1436 since it's now disabling the timed buffer through the CLI --- .../activemq/artemis/cli/commands/Create.java | 45 +++++-- .../commands/tools/journal/PerfJournal.java | 19 ++- .../cli/commands/util/SyncCalculation.java | 29 +++-- .../artemis/cli/commands/etc/broker.xml | 3 +- .../commands/etc/journal-buffer-settings.txt | 11 +- .../cli/commands/etc/ping-settings.txt | 1 + .../apache/activemq/cli/test/ArtemisTest.java | 2 +- .../config/ActiveMQDefaultConfiguration.java | 2 +- .../io/AbstractSequentialFileFactory.java | 7 +- .../core/io/aio/AIOSequentialFileFactory.java | 32 +++-- .../mapped/MappedSequentialFileFactory.java | 117 ++---------------- .../core/io/nio/NIOSequentialFileFactory.java | 5 - .../artemis/core/io/JournalTptBenchmark.java | 2 +- .../core/io/SequentialFileTptBenchmark.java | 2 +- .../impl/FileConfigurationParser.java | 2 +- .../AbstractJournalStorageManager.java | 1 + .../impl/journal/JournalStorageManager.java | 2 +- docs/user-manual/en/configuration-index.md | 2 +- .../journal/AIOUnbuferedJournalImplTest.java | 71 +++++++++++ .../journal/MappedImportExportTest.java | 2 +- .../journal/MappedJournalCompactTest.java | 2 +- .../journal/MappedJournalImplTest.java | 3 +- .../MappedSequentialFileFactoryTest.java | 4 +- .../MappedUnbuferedJournalImplTest.java | 61 +++++++++ .../ValidateTransactionHealthTest.java | 2 +- 25 files changed, 271 insertions(+), 158 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/AIOUnbuferedJournalImplTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedUnbuferedJournalImplTest.java diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java index bd0b4cd2e2..2897f6d93a 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java @@ -38,6 +38,7 @@ import java.util.regex.Pattern; import io.airlift.airline.Arguments; import io.airlift.airline.Command; import io.airlift.airline.Option; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.cli.CLIException; import org.apache.activemq.artemis.cli.commands.util.HashUtil; import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; @@ -723,6 +724,16 @@ public class Create extends InputAbstract { } private void setupJournalType() { + + if (noJournalSync && !mapped) { + boolean useMapped = inputBoolean("--mapped", "Since you disabled syncs, it is recommended to use the Mapped Memory Journal. Do you want to use the Memory Mapped Journal", true); + + if (useMapped) { + mapped = true; + nio = false; + aio = false; + } + } int countJournalTypes = countBoolean(aio, nio, mapped); if (countJournalTypes > 1) { throw new RuntimeException("You can only select one journal type (--nio | --aio | --mapped)."); @@ -803,20 +814,34 @@ public class Create extends InputAbstract { System.out.println(""); System.out.println("Auto tuning journal ..."); - long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, journalType); - long nanoseconds = SyncCalculation.toNanos(time, writes, verbose); - double writesPerMillisecond = (double) writes / (double) time; + if (mapped && noJournalSync) { + HashMap syncFilter = new HashMap<>(); + syncFilter.put("${nanoseconds}", "0"); + syncFilter.put("${writesPerMillisecond}", "0"); + syncFilter.put("${maxaio}", journalType == JournalType.ASYNCIO ? "" + ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio() : "1"); - String writesPerMillisecondStr = new DecimalFormat("###.##").format(writesPerMillisecond); + System.out.println("...Since you disabled sync and are using MAPPED journal, we are diabling buffer times"); - HashMap syncFilter = new HashMap<>(); - syncFilter.put("${nanoseconds}", Long.toString(nanoseconds)); - syncFilter.put("${writesPerMillisecond}", writesPerMillisecondStr); + filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter)); - System.out.println("done! Your system can make " + writesPerMillisecondStr + - " writes per millisecond, your journal-buffer-timeout will be " + nanoseconds); + } else { + long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, false, "journal-test.tmp", ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio(), journalType); + long nanoseconds = SyncCalculation.toNanos(time, writes, verbose); + double writesPerMillisecond = (double) writes / (double) time; + + String writesPerMillisecondStr = new DecimalFormat("###.##").format(writesPerMillisecond); + + HashMap syncFilter = new HashMap<>(); + syncFilter.put("${nanoseconds}", Long.toString(nanoseconds)); + syncFilter.put("${writesPerMillisecond}", writesPerMillisecondStr); + syncFilter.put("${maxaio}", journalType == JournalType.ASYNCIO ? "" + ActiveMQDefaultConfiguration.getDefaultJournalMaxIoAio() : "1"); + + System.out.println("done! Your system can make " + writesPerMillisecondStr + + " writes per millisecond, your journal-buffer-timeout will be " + nanoseconds); + + filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter)); + } - filters.put("${journal-buffer.settings}", readTextFile(ETC_JOURNAL_BUFFER_SETTINGS, syncFilter)); } catch (Exception e) { filters.put("${journal-buffer.settings}", ""); diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/PerfJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/PerfJournal.java index 3805de67d7..ab2769f982 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/PerfJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/PerfJournal.java @@ -22,13 +22,13 @@ import java.text.DecimalFormat; import io.airlift.airline.Command; import io.airlift.airline.Option; import org.apache.activemq.artemis.cli.commands.ActionContext; -import org.apache.activemq.artemis.cli.commands.tools.LockAbstract; +import org.apache.activemq.artemis.cli.commands.tools.OptionalLocking; import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; import org.apache.activemq.artemis.core.config.impl.FileConfiguration; import org.apache.activemq.artemis.core.server.JournalType; @Command(name = "perf-journal", description = "Calculates the journal-buffer-timeout you should use with the current data folder") -public class PerfJournal extends LockAbstract { +public class PerfJournal extends OptionalLocking { @Option(name = "--block-size", description = "The block size for each write (default 4096)") @@ -49,6 +49,15 @@ public class PerfJournal extends LockAbstract { @Option(name = "--journal-type", description = "Journal Type to be used (default from broker.xml)") public String journalType = null; + @Option(name = "--sync-writes", description = "It will perform each write synchronously, like if you had a single producer") + public boolean syncWrites = false; + + @Option(name = "--file", description = "The file name to be used (default test.tmp)") + public String fileName = "test.tmp"; + + @Option(name = "--max-aio", description = "libaio.maxAIO to be used (default: configuration::getJournalMaxIO_AIO()") + public int maxAIO = 0; + @Override public Object execute(ActionContext context) throws Exception { @@ -74,7 +83,11 @@ public class PerfJournal extends LockAbstract { fileConfiguration.getJournalLocation().mkdirs(); - long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), fileConfiguration.getJournalType()); + if (maxAIO <= 0) { + maxAIO = fileConfiguration.getJournalMaxIO_AIO(); + } + + long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), syncWrites, fileName, maxAIO, fileConfiguration.getJournalType()); long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose); double writesPerMillisecond = (double) writes / (double) time; diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java index 860bcb6818..5cc6ab994b 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java @@ -50,13 +50,16 @@ public class SyncCalculation { int tries, boolean verbose, boolean fsync, + boolean syncWrites, + String fileName, + int maxAIO, JournalType journalType) throws Exception { - SequentialFileFactory factory = newFactory(datafolder, fsync, journalType, blockSize * blocks); + SequentialFileFactory factory = newFactory(datafolder, fsync, journalType, blockSize * blocks, maxAIO); if (verbose) { - System.out.println("Using " + factory.getClass().getName() + " to calculate sync times"); + System.out.println("Using " + factory.getClass().getName() + " to calculate sync times, alignment=" + factory.getAlignment()); } - SequentialFile file = factory.createSequentialFile("test.tmp"); + SequentialFile file = factory.createSequentialFile(fileName); try { file.delete(); @@ -106,10 +109,14 @@ public class SyncCalculation { bufferBlock.position(0); latch.countUp(); file.writeDirect(bufferBlock, true, callback); - if (!latch.await(5, TimeUnit.SECONDS)) { - throw new IOException("Callback wasn't called"); + + if (syncWrites) { + flushLatch(latch); } } + + if (!syncWrites) flushLatch(latch); + long end = System.currentTimeMillis(); result[ntry] = (end - start); @@ -150,6 +157,12 @@ public class SyncCalculation { } } + private static void flushLatch(ReusableLatch latch) throws InterruptedException, IOException { + if (!latch.await(5, TimeUnit.SECONDS)) { + throw new IOException("Timed out on receiving IO callback"); + } + } + public static long toNanos(long time, long blocks, boolean verbose) { double blocksPerMillisecond = (double) blocks / (double) (time); @@ -169,7 +182,7 @@ public class SyncCalculation { return timeWait; } - private static SequentialFileFactory newFactory(File datafolder, boolean datasync, JournalType journalType, int fileSize) { + private static SequentialFileFactory newFactory(File datafolder, boolean datasync, JournalType journalType, int fileSize, int maxAIO) { SequentialFileFactory factory; if (journalType == JournalType.ASYNCIO && !LibaioContext.isLoaded()) { @@ -184,12 +197,12 @@ public class SyncCalculation { factory.start(); return factory; case ASYNCIO: - factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync); + factory = new AIOSequentialFileFactory(datafolder, maxAIO).setDatasync(datasync); factory.start(); ((AIOSequentialFileFactory) factory).disableBufferReuse(); return factory; case MAPPED: - factory = MappedSequentialFileFactory.unbuffered(datafolder, fileSize, null) + factory = new MappedSequentialFileFactory(datafolder, fileSize, false, 0, 0, null) .setDatasync(datasync) .disableBufferReuse(); factory.start(); diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml index 241c35486a..013b911487 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml @@ -50,7 +50,8 @@ under the License. -1 -${ping-config.settings}${journal-buffer.settings}${connector-config.settings} + 10M + ${journal-buffer.settings}${ping-config.settings}${connector-config.settings} 5000 diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt index 566c29ea78..fc9e2ba1fe 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/journal-buffer-settings.txt @@ -3,6 +3,15 @@ This value was determined through a calculation. Your system could perform ${writesPerMillisecond} writes per millisecond on the current journal configuration. - That translates as a sync write every ${nanoseconds} nanoseconds + That translates as a sync write every ${nanoseconds} nanoseconds. + + Note: If you specify 0 the system will perform writes directly to the disk. + We recommend this to be 0 if you are using journalType=MAPPED and ournal-datasync=false. --> ${nanoseconds} + + + + ${maxaio} diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/ping-settings.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/ping-settings.txt index 242c83559e..c7f824d8e9 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/ping-settings.txt +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/ping-settings.txt @@ -1,3 +1,4 @@ +