From 5e5ac0f0575a89ea81bcedfce63e73949c14c11c Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 1 Nov 2016 21:38:02 -0400 Subject: [PATCH] ARTEMIS-832 Openwire was ignoring data syncs. I'm also adding the possibility of sync on libaio, and not only relay on write-cache --- .../activemq/artemis/cli/commands/Create.java | 8 +- .../cli/commands/util/SyncCalculation.java | 7 +- .../artemis/cli/commands/etc/broker.xml | 2 + .../apache/activemq/cli/test/ArtemisTest.java | 8 +- .../config/ActiveMQDefaultConfiguration.java | 7 + .../client/impl/ClientSessionFactoryImpl.java | 3 +- .../store/file/JDBCSequentialFileFactory.java | 12 + .../core/io/AbstractSequentialFile.java | 22 -- .../io/AbstractSequentialFileFactory.java | 15 ++ .../core/io/SequentialFileFactory.java | 4 + .../core/io/aio/AIOSequentialFile.java | 8 +- .../core/io/aio/AIOSequentialFileFactory.java | 2 +- .../core/io/mapped/MappedSequentialFile.java | 19 +- .../mapped/MappedSequentialFileFactory.java | 14 +- .../core/io/nio/NIOSequentialFile.java | 65 +---- artemis-native/bin/libartemis-native-64.so | Bin 25003 -> 28687 bytes ...e_activemq_artemis_jlibaio_LibaioContext.c | 11 +- .../artemis/jlibaio/LibaioContext.java | 12 +- .../artemis/jlibaio/test/LibaioTest.java | 10 +- .../jlibaio/test/OpenCloseContextTest.java | 8 +- .../amqp/broker/AMQPConnectionCallback.java | 2 +- .../amqp/broker/AMQPSessionCallback.java | 9 +- .../protocol/mqtt/MQTTConnectionManager.java | 3 +- .../protocol/openwire/OpenWireConnection.java | 107 ++++---- .../protocol/openwire/amq/AMQSession.java | 4 +- .../protocol/stomp/StompProtocolManager.java | 4 +- .../artemis/core/config/Configuration.java | 17 ++ .../core/config/impl/ConfigurationImpl.java | 13 + .../impl/FileConfigurationParser.java | 2 + .../impl/journal/JournalStorageManager.java | 4 + .../core/impl/ActiveMQPacketHandler.java | 5 +- .../artemis/core/server/ActiveMQServer.java | 6 +- .../core/server/impl/ActiveMQServerImpl.java | 9 +- .../schema/artemis-configuration.xsd | 8 + .../config/impl/FileConfigurationTest.java | 2 + .../artemis/tests/util/ActiveMQTestBase.java | 2 +- .../ConfigurationTest-full-config.xml | 1 + docs/user-manual/en/configuration-index.md | 1 + docs/user-manual/en/persistence.md | 4 + .../integration/persistence/SyncSendTest.java | 235 ++++++++++++++++++ .../vertx/ActiveMQVertxUnitTest.java | 5 +- .../impl/fakes/FakeSequentialFileFactory.java | 10 + 42 files changed, 505 insertions(+), 185 deletions(-) mode change 100644 => 100755 artemis-native/bin/libartemis-native-64.so create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java index ecb9e4963c..be788cdea3 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java @@ -213,6 +213,9 @@ public class Create extends InputAbstract { @Option(name = "--no-hornetq-acceptor", description = "Disable the HornetQ specific acceptor.") boolean noHornetQAcceptor; + @Option(name = "--no-fsync", description = "Disable usage of fdatasync (channel.force(false) from java nio) on the journal") + boolean noJournalSync; + boolean IS_WINDOWS; boolean IS_CYGWIN; @@ -567,6 +570,7 @@ public class Create extends InputAbstract { filters.put("${web.protocol}", "http"); filters.put("${extra.web.attributes}", ""); } + filters.put("${fsync}", String.valueOf(!noJournalSync)); filters.put("${user}", System.getProperty("user.name", "")); filters.put("${default.port}", String.valueOf(defaultPort + portOffset)); filters.put("${amqp.port}", String.valueOf(AMQP_PORT + portOffset)); @@ -776,7 +780,7 @@ public class Create extends InputAbstract { System.out.println(""); System.out.println("Auto tuning journal ..."); - long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, aio); + long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, aio); long nanoseconds = SyncCalculation.toNanos(time, writes); double writesPerMillisecond = (double) writes / (double) time; @@ -807,7 +811,7 @@ public class Create extends InputAbstract { // forcing NIO return false; } else if (LibaioContext.isLoaded()) { - try (LibaioContext context = new LibaioContext(1, true)) { + try (LibaioContext context = new LibaioContext(1, true, true)) { File tmpFile = new File(directory, "validateAIO.bin"); boolean supportsLibaio = true; try { diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java index 468eabf7ff..315ebdccb1 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java @@ -46,8 +46,9 @@ public class SyncCalculation { int blocks, int tries, boolean verbose, + boolean fsync, boolean aio) throws Exception { - SequentialFileFactory factory = newFactory(datafolder, aio); + SequentialFileFactory factory = newFactory(datafolder, fsync, aio); SequentialFile file = factory.createSequentialFile("test.tmp"); try { @@ -149,9 +150,9 @@ public class SyncCalculation { return timeWait; } - private static SequentialFileFactory newFactory(File datafolder, boolean aio) { + private static SequentialFileFactory newFactory(File datafolder, boolean datasync, boolean aio) { if (aio && LibaioContext.isLoaded()) { - SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1); + SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync); factory.start(); ((AIOSequentialFileFactory) factory).disableBufferReuse(); diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml index fe28246b80..58c103c43e 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml @@ -45,6 +45,8 @@ under the License. ${data.dir}/large-messages + ${fsync} + 2 -1 diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java index ba78fb23dd..2359f1d324 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java @@ -129,7 +129,7 @@ public class ArtemisTest { public void testSync() throws Exception { int writes = 20; int tries = 10; - long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true); + long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true); System.out.println(); System.out.println("TotalAvg = " + totalAvg); long nanoTime = SyncCalculation.toNanos(totalAvg, writes); @@ -144,7 +144,7 @@ public class ArtemisTest { Run.setEmbedded(true); //instance1: default using http File instance1 = new File(temporaryFolder.getRoot(), "instance1"); - Artemis.main("create", instance1.getAbsolutePath(), "--silent"); + Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--no-fsync"); File bootstrapFile = new File(new File(instance1, "etc"), "bootstrap.xml"); Assert.assertTrue(bootstrapFile.exists()); Document config = parseXml(bootstrapFile); @@ -163,7 +163,7 @@ public class ArtemisTest { //instance2: https File instance2 = new File(temporaryFolder.getRoot(), "instance2"); - Artemis.main("create", instance2.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1"); + Artemis.main("create", instance2.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--no-fsync"); bootstrapFile = new File(new File(instance2, "etc"), "bootstrap.xml"); Assert.assertTrue(bootstrapFile.exists()); config = parseXml(bootstrapFile); @@ -184,7 +184,7 @@ public class ArtemisTest { //instance3: https with clientAuth File instance3 = new File(temporaryFolder.getRoot(), "instance3"); - Artemis.main("create", instance3.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--use-client-auth", "--ssl-trust", "etc/truststore", "--ssl-trust-password", "password2"); + Artemis.main("create", instance3.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--use-client-auth", "--ssl-trust", "etc/truststore", "--ssl-trust-password", "password2", "--no-fsync"); bootstrapFile = new File(new File(instance3, "etc"), "bootstrap.xml"); Assert.assertTrue(bootstrapFile.exists()); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index e07493f99c..60dd3eb747 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -130,6 +130,9 @@ public final class ActiveMQDefaultConfiguration { // true means that the server will use the file based journal for persistence. private static boolean DEFAULT_PERSISTENCE_ENABLED = true; + // true means that the server will sync data files + private static boolean DEFAULT_JOURNAL_DATASYNC = true; + // Maximum number of threads to use for the scheduled thread pool private static int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5; @@ -456,6 +459,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_PERSISTENCE_ENABLED; } + public static boolean isDefaultJournalDatasync() { + return DEFAULT_JOURNAL_DATASYNC; + } + /** * Maximum number of threads to use for the scheduled thread pool */ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index d781fff131..d2d9886d85 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -93,7 +93,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private final long connectionTTL; - private final Set sessions = new HashSet<>(); + private final Set sessions = new ConcurrentHashSet<>(); private final Object createSessionLock = new Object(); @@ -506,6 +506,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C // this is just a debug, since an interrupt is an expected event (in case of a shutdown) logger.debug(e1.getMessage(), e1); } catch (Throwable t) { + logger.warn(t.getMessage(), t); //for anything else just close so clients are un blocked close(); throw t; diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java index cafb261cd2..66f00ec665 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java @@ -60,6 +60,18 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider); } + @Override + public SequentialFileFactory setDatasync(boolean enabled) { + + // noop + return this; + } + + @Override + public boolean isDatasync() { + return false; + } + @Override public synchronized void start() { try { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java index 0c6dcdfc36..cd15246e3f 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java @@ -21,9 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -58,11 +56,6 @@ public abstract class AbstractSequentialFile implements SequentialFile { */ protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver(); - /** - * Used for asynchronous writes - */ - protected final Executor writerExecutor; - /** * @param file * @param directory @@ -75,7 +68,6 @@ public abstract class AbstractSequentialFile implements SequentialFile { this.file = new File(directory, file); this.directory = directory; this.factory = factory; - this.writerExecutor = writerExecutor; } // Public -------------------------------------------------------- @@ -166,20 +158,6 @@ public abstract class AbstractSequentialFile implements SequentialFile { */ @Override public synchronized void close() throws IOException, InterruptedException, ActiveMQException { - final CountDownLatch donelatch = new CountDownLatch(1); - - if (writerExecutor != null) { - writerExecutor.execute(new Runnable() { - @Override - public void run() { - donelatch.countDown(); - } - }); - - while (!donelatch.await(60, TimeUnit.SECONDS)) { - ActiveMQJournalLogger.LOGGER.couldNotCompleteTask(new Exception("trace"), file.getName()); - } - } } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java index 6e61c867d6..5aa723d86c 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java @@ -52,6 +52,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac protected final int maxIO; + protected boolean dataSync = true; + private final IOCriticalErrorListener critialErrorListener; /** @@ -81,6 +83,19 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac this.maxIO = maxIO; } + + @Override + public SequentialFileFactory setDatasync(boolean enabled) { + this.dataSync = enabled; + return this; + } + + @Override + public boolean isDatasync() { + return dataSync; + } + + @Override public void stop() { if (timedBuffer != null) { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java index 81203cf50f..2229edf2f9 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java @@ -95,4 +95,8 @@ public interface SequentialFileFactory { void createDirs() throws Exception; void flush(); + + SequentialFileFactory setDatasync(boolean enabled); + + boolean isDatasync(); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index a0d20d2eec..874e411d63 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -97,7 +97,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { @Override public SequentialFile cloneFile() { - return new AIOSequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName(), writerExecutor); + return new AIOSequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName(), null); } @Override @@ -214,11 +214,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(callback, bytes); runnableCallback.initWrite(positionToWrite, bytesToWrite); - if (writerExecutor != null) { - writerExecutor.execute(runnableCallback); - } else { - runnableCallback.run(); - } + runnableCallback.run(); } AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index da0d0792de..57d18f5c18 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -211,7 +211,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor if (running.compareAndSet(false, true)) { super.start(); - this.libaioContext = new LibaioContext(maxIO, true); + this.libaioContext = new LibaioContext(maxIO, true, dataSync); this.running.set(true); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java index 522dbd1cc3..017948bcbe 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java @@ -49,12 +49,15 @@ final class MappedSequentialFile implements SequentialFile { private String fileName; private MappedFile mappedFile; private ActiveMQBuffer pooledActiveMQBuffer; + private final MappedSequentialFileFactory factory; - MappedSequentialFile(final File directory, + MappedSequentialFile(MappedSequentialFileFactory factory, + final File directory, final File file, final long chunkBytes, final long overlapBytes, final IOCriticalErrorListener criticalErrorListener) { + this.factory = factory; this.directory = directory; this.file = file; this.absoluteFile = null; @@ -155,7 +158,7 @@ final class MappedSequentialFile implements SequentialFile { final int readableBytes = writerIndex - readerIndex; if (readableBytes > 0) { this.mappedFile.write(byteBuf, readerIndex, readableBytes); - if (sync) { + if (factory.isDatasync() && sync) { this.mappedFile.force(); } } @@ -178,7 +181,7 @@ final class MappedSequentialFile implements SequentialFile { final int readableBytes = writerIndex - readerIndex; if (readableBytes > 0) { this.mappedFile.write(byteBuf, readerIndex, readableBytes); - if (sync) { + if (factory.isDatasync() && sync) { this.mappedFile.force(); } } @@ -209,7 +212,7 @@ final class MappedSequentialFile implements SequentialFile { final int readableBytes = writerIndex - readerIndex; if (readableBytes > 0) { this.mappedFile.write(byteBuf, readerIndex, readableBytes); - if (sync) { + if (factory.isDatasync() && sync) { this.mappedFile.force(); } } @@ -235,7 +238,7 @@ final class MappedSequentialFile implements SequentialFile { final int readableBytes = writerIndex - readerIndex; if (readableBytes > 0) { this.mappedFile.write(byteBuf, readerIndex, readableBytes); - if (sync) { + if (factory.isDatasync() && sync) { this.mappedFile.force(); } } @@ -253,7 +256,7 @@ final class MappedSequentialFile implements SequentialFile { final int remaining = limit - position; if (remaining > 0) { this.mappedFile.write(bytes, position, remaining); - if (sync) { + if (factory.isDatasync() && sync) { this.mappedFile.force(); } } @@ -275,7 +278,7 @@ final class MappedSequentialFile implements SequentialFile { final int remaining = limit - position; if (remaining > 0) { this.mappedFile.write(bytes, position, remaining); - if (sync) { + if (factory.isDatasync() && sync) { this.mappedFile.force(); } } @@ -381,7 +384,7 @@ final class MappedSequentialFile implements SequentialFile { @Override public SequentialFile cloneFile() { checkIsNotOpen(); - return new MappedSequentialFile(this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener); + return new MappedSequentialFile(factory, this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener); } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java index 23af0b6a5c..8ccef74bc1 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java @@ -37,6 +37,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory private final IOCriticalErrorListener criticalErrorListener; private long chunkBytes; private long overlapBytes; + private boolean useDataSync; public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) { this.directory = directory; @@ -72,7 +73,18 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory @Override public SequentialFile createSequentialFile(String fileName) { - return new MappedSequentialFile(directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener); + return new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener); + } + + @Override + public SequentialFileFactory setDatasync(boolean enabled) { + this.useDataSync = enabled; + return this; + } + + @Override + public boolean isDatasync() { + return useDataSync; } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index 40e0544216..2887d25fe2 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -23,8 +23,6 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.concurrent.Executor; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -35,7 +33,6 @@ import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; -import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; public final class NIOSequentialFile extends AbstractSequentialFile { @@ -43,11 +40,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile { private RandomAccessFile rfile; - /** - * The write semaphore here is only used when writing asynchronously - */ - private Semaphore maxIOSemaphore; - private final int defaultMaxIO; private int maxIO; @@ -99,11 +91,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile { factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); throw e; } - - if (writerExecutor != null && useExecutor) { - maxIOSemaphore = new Semaphore(maxIO); - this.maxIO = maxIO; - } } @Override @@ -124,6 +111,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); throw e; } + channel.force(true); fileSize = channel.size(); } @@ -138,13 +126,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile { public synchronized void close() throws IOException, InterruptedException, ActiveMQException { super.close(); - if (maxIOSemaphore != null) { - while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS)) { - ActiveMQJournalLogger.LOGGER.errorClosingFile(getFileName()); - } - } - - maxIOSemaphore = null; try { if (channel != null) { channel.close(); @@ -202,7 +183,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { @Override public void sync() throws IOException { - if (channel != null) { + if (factory.isDatasync() && channel != null) { try { channel.force(false); } catch (ClosedChannelException e) { @@ -250,7 +231,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { @Override public SequentialFile cloneFile() { - return new NIOSequentialFile(factory, directory, getFileName(), maxIO, writerExecutor); + return new NIOSequentialFile(factory, directory, getFileName(), maxIO, null); } @Override @@ -298,40 +279,12 @@ public final class NIOSequentialFile extends AbstractSequentialFile { position.addAndGet(bytes.limit()); - if (maxIOSemaphore == null || callback == null) { - // if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous - try { - doInternalWrite(bytes, sync, callback); - } catch (ClosedChannelException e) { - throw e; - } catch (IOException e) { - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); - } - } else { - // This is a flow control on writing, just like maxAIO on libaio - maxIOSemaphore.acquire(); - - writerExecutor.execute(new Runnable() { - @Override - public void run() { - try { - try { - doInternalWrite(bytes, sync, callback); - } catch (ClosedChannelException e) { - ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e); - } catch (IOException e) { - ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e); - factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this); - callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); - } catch (Throwable e) { - ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e); - callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage()); - } - } finally { - maxIOSemaphore.release(); - } - } - }); + try { + doInternalWrite(bytes, sync, callback); + } catch (ClosedChannelException e) { + throw e; + } catch (IOException e) { + factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); } } diff --git a/artemis-native/bin/libartemis-native-64.so b/artemis-native/bin/libartemis-native-64.so old mode 100644 new mode 100755 index 95a5451ee3e57655837dbba55e045c33d0540136..8cbe85138b280e1e4756c977b451a08e65de6b7c GIT binary patch literal 28687 zcmeHwdwf*Ywf{~+AQG620vZwZXafetkU_v8QZwYinJ|Iy6h%4=$%I6bOw3FmC~9mH zJ{-rfsr6cFdt0vc$B(PtmOg%7N?Q%jVB4z^6#=hTilQ?{9=;Ht^ZTxS_MV(`GUJa% z`_J#g;WMnW)?RDvwbx$zaZdKExwph!nwpZrBcvFY zgc&-@1_9D%pD4G&9Kk0c@zPasELcVNHPblS4}PSRnS@eq9^}Z*v%TzsalWW8nO|DT z0k|X-&Eah$SCQLh%~`}6s=UX3$fj@j&F}92)ipJTM*MW?U;jvQWAVwwhwLQZY}lE~ z?n~XL(pPzi; zxCiH+cro+LOIhdd`2M>uowWV(gAon)y|Bjp%Uc|mU4OUpi=*H9{)2%x?|uI8l4Tvo zKff>bU)AF}uUJ#@AA6qqVEf2-4Vj%s$#&G!7klftkdy z_tCM5=z}r%648etP7={Cgf=CC{?sJ(zL$i496A!||9KKS!%65j4N0tLRg!vpljvzp zLZ6sK{|8C*gUk~7=kp}(J%vG@$PeBm_Ru_$h<{lUJ#9(soR*|rtCQ3_4Ff(=y-Sm{ zH%WL~&Qmx=u1h}?^ezkC4z3H>Fm`Z? zP$0l%JWeuMp{RGZp#PNfquxt(0*!OJp2FvRb`fi|=nsi{j|jS4w~B=Q_RG0ATY>h| zwTs73CacKP@fty&BlPrG^xwzh6Mhr+m*`^54|oB~WLqtEx`jWNQvl%V&;ff`&`$}v zTw8uG>fK_g_dH>Lr^WxD33}@lx?^~v+*$PFna`=?6^N6tXZc+Ms9?ytFu~5^x zOxJfsf7xl_#`Tcsuk!@|rW~E#4xFwkVSmq+Ixd%``-GmY*XVSq=QVEUh5g#qB-+L1 zE4qMP2h^X`-d0O{&xm#{7yhi!*;%Tnce!ZS5IDEa>zdogR_l-78#v}R2j^Co`|ABoYfGA%{7uX!^_Ker0q;s5 z3j~{dyaYFOz6MtBt*i4_vU*>AW#d{ZC6LfEu5gZ@RX6#3{p^v2WpiAfxeac=w~E0@ z<-V2Pna$PJz9!b_57f4JP!0UC64mTRg1gM6TQMSX2hZz^vyTV)RFY-48eJz61L@EX=T8OS} zV%2L%Ija`cR@V6gzG4_7std5@hPv8@RZMTsqJ}zZQJHtO*W+(m>G3vtD{FinZ)LD{ zwXgmT55mP)UmNh;uGi;r8!hn!YMO&p{xuD%-w%ZY(}6x(#j5%L1JuyqTO(Y8VF`W%Ju|77NLgv|x1j~$|BWoF=1Yf{rcoX)bpfAmz!O|` z?ZAUZUra=I>r-%Ez@O(}xUZzUsm$Y}s78xd1Ooc{4pKuEafr$L+6La+*G`=b3Fdaq zbh_qx^7AJ5-8$GTkc&K%@*I=U;rLvTN%0Ey<2D*IaAhB%*y3?Si z%RrxFpzkx##a5h`_892$S&#}28t4-YdX5bQ+KH+GwDk zFNuJg40L0BZZXj38Thvv==9!KUfT@x5t0blWuT8V(DxbW^gdW#JqG$HNd!D-pv!j{ zRD8rhzsSHJG0?LO^iu}<3H*+nupXXNpbPkD}Pvm+^nr<%PH-yoL&+M`F0f{)KRQ%8vGMd?(?w zh()_NzMXKI0-{?v{#(Ln=tnnk{0YKo$VWRl{utpjWk*{%{wU$pP0>b-ox>kgwv3Vc5(a$!fEJ4w{mMry&vT;TiV>EKkMw;K3&oJmu)o$8KvZ&Fiw)ECb*bKz0lTzG%* zgJr6=H+GcdTW5@i9BaNxZJY6H6r)P*Sa2BXFUwb0q#PcJ)Zo8md(#;hQ$E~PO;=vl z;AA?@L*^#V#0WoNwtKli{B8FlW`D$o{s=a~sEmcEhjcRz*-w;c&ze{_a zi?k11+Lt>%yWF|ZaX|ZI(fC&vsF))nMFws5 z$2S5vx=88(wIA?%1Vf{)0$WFymYTKU&s@!oIpFRfq0kWOB2>`6N|tDQ)zC$;hqfR% zAUcd{=;%^Ikb#2s8#&t$$S;Sy6wXU5eD6fFaE$9D;a6mET>3k~qorlFzr)L^CZi36 zr=gXjc`Jv1F$#uhd)=WS&RAy#uXxmAsRR~2CY6LoeMGF`;ZXvi>!`6(;LAMSGA%XO z%-a^8(gc%)KSGxdmkQH@i}bc9vgl!5ercgno6pCOem8&7d*XiJJ?nUZ>tvYMv*OdL@Ysvyi z%i4Bl@9K=Vt&N*7V+`0M?|c%AMVHY;NB+AQq{RN1tII?t2!H$UkxSAJX; zzVTxyW08lc>W(fHT6ef4<_h1oJEM0P;>Fe0o#C4HMsVV@g~aZY>_eK5 zVU~=13`;Te+ji&b^PTg1P7eaBkUiT<>{E4iOqRhH)o@F;<8&v(*8;!eog;R>%KF@SNTWu6uCPb~Gic!-pHSyM(rB1QmkYK_euW5H&R2 z@m#gG$gYObnOgHm6!86KwN`$CKGSG*@Z4>%5=+INtPQu4%5YJk3zM#J$olTsGCQE_2(Mt=WcEH-Ub@w3FLe zH^K;hZ0jCG9o9aVO8-jpKf)7RfPC_9crmK_?90_lDNODn*T#J_yT;&~$uAmx6PbrL zV*?bP;>qH}x^a%Gsp=e**yMw5S(S_Kp=>FeZLeX<~%5QIqUZWFrf0Q_$BY(BMOD>!HRWzFl z4g1#x*gRrt-Ge$hx?0Yo)sfpDNdpBP6G8w_x!=av(&+gxtJAs2xzMTo4cod`9Q#%6 z1j*AH#}|7Qk9j5SZ@IKR+n+{+25&$hKQptCc1e*oNISLNYADT3yGU#y--vZqkU+;< zE#uMk8O7R>_~sEa7$o6GNV;^%-reNC$g1}dd1TaiFp47QPBPRaZfqaTH_w+>e(GFY zuI(k`X+tV!9c^LGF!Rjw3e1)>j~Dg!%{*_Sq;J+i#I%IVQrw{l2d3@QW}9c8Tv$|2 zGtYlQhVOCF|I0D&=yf?U3AwZ`M8`954a@4Fzk%zs5P1!En1y!ZPJ5rvLWQUI>eJ9h za05wgzO(J^l;(Ow^ssWRB-^pST)SH-*P5~Jove0hi!PwmqQ zRP%oC8^q@hm^q;M+<*rCGvbqWsXRWLC_a5>5uc8(vkT6tuvibyxt3nX`q2A`H7*~^tg=ochdN*M3rZ^E=)TijYDB! zdH9}ev4_cxjD~wu?Y10+@7G%4P%X`@ps#V%;Vi|aC+-KAn16pK{DAnYB3 z4(uCOz=ZP9pd5JK6{e?N%kw@Q!C_I#nX?Y~x;n+K#%hIMm zA650qA?*dtor6cjEd^LRBVibdy74p|dxe_HH@w4$S?ekfjmbgfPh5`2FB0Ka3fq4~ zwnyLSo4?3MjZk*>KAPD}cCO3G#fU8Edpd|-4f-bs5uf7`&#(LF>~h2HDmR?DW0sb78h&#W1WrgDG^-#ou7;%fK zt%(<_8%>);Y@kyjxv1Vb?L^jtKR_R=TEvOPYsZmvHLVC{s&;p%C4>6(4&J9jRKznk z=MkzI9bA_)Ms{*1buxv8J@QK^g-!fb4e;3h^3XDdUPce^yHxLC-D+bG7bA|wL^Dv4 z^e;`H#oDPp?~CVX^h*68>MyK!Q8d>2%qQiw9;eo`=qK1jeex-FOE%tf6uBx7s+I4n z6o-c+??SwztGQHvjdK{!xl1AxDYP!*?a5>e7i@&YLuvL>dR2&hKI*>$Q4#rX;>DW< zdf-F}(I>$!yv&7V6H(~s>g_h{U)7HN!S9F-D4r}ua9}%#`6oK@9cp+hdV%x|fu86( zs4(six6$*$uD<8f1=<1ZXz6q>Q}26^zjx7&Mpi=#+hZH87WwLZyNSMZSLBVi#a{Fe z`1=R#MNgnYwYv-cj`beo?Mkd~oUU&;)D`rp>y+wl7wUcib=$bI!_xnJ18dlN3mfSP zns(o+b`%l9&8IiH_do&czMAj4QPQ{THr9+cws3Vj+Jp|sltvt*2_0=39XH|y^v5E{ zY1%+*FFE$(Ei=71-VOoOaT!?erRKx;+8d7jOP4uLN9UkS-uk~iauC`q%&S|gTQ{X~?@mID8msT+2mBQLL$tFy2h=EzCyf5~n_kcsU z&#j$OcbvIN-4U7KO4;kY55{FN*Z94`i!s2iM?J2}0~mdw{K(b1)|MxDOHWcu>5VtN z0YZy!BaMdm#N);R?Y~SED_?P(UW&g*bVnT!#rr}E-8W!r8G-rwa6P=$v z459$XH=A&P%-5utC94?Mt*oi3t@A1RsktEPRa|p#P(&G>>LU^k6Zv=@q=F9T^-)l( zJhr6pGpWi7en9mGHmA9+4kR2y#c@+%comK#Hr3ZQ__zWkfU{R*Vsl*;x2_QfC+YYs z&OmVnDQ0LdnIgK!AQ+bl*7y{MLg!=^{rr+r?QiPymuyUcA5*BLipg)4s25cRg8oL` zup0f8tTL7};)v=*{fO#Bc|=uq{6uj?b>aekp4Qb+?Pu(4_)SMzMT2tPXCv8H6NLy} zbI>ck27mL)8YO^}v_7;~(T{EE;UUzUI=m7tBX1bbi0WO>nD89+j?&=sRRxrwUs>Tp zNAr_}gf-HU>ng?Dw6eJ#$4m7ZTn($iQYAg#42}uXkS5<9&9zOwfTAA)6jjtV;CQmP zO7T}KbR1Bbxi%<|8A4^sC27+?0=yKTeMe$3JMBZ>ip2^5YXB<%HvrZEZUfu^n2VuI zyW1K-+J9{Uv|~cb#%fRqNbBShz+9}LEr2zEoq!tvw*VGk7m|xvVIOwXTL8}h?gQL{ zN0n?WZYth8Yy^zpMO8KygC%%%+6s6E@0lZj8}Ke%!6Lg4umJENU_AooGuAXk+l%`i5>Zv)M~+ z*=1RoYcg8dO{1q@eO=C#BuDba_+-2pi{TF{@r&rU;bXVZNoNqBY|#0ORj19~mU^oV zdrN*z#{UoFlZ)~iqRh0}!>MJq><7|HY|8pUMYi0w^kQ3nYN;)^*rqsb*vT zl6d_k@%o*j{KYK+Rm0kX}RvkUxKq`m-^1CgPs@xh-`r9I9r{hkx84 z(Yya#ls``8urEyhd4S?z{U8^`&n(1G(NJ>$6=yb?2_($8zjH7qH$Wf%ZVhj*O6^5F zh6l}zx3^ff_c8=X0*9l$KTcRMD@l82Q+vtZkJFrh=iRfc|B}S@J2O*%@sHPEh5B;_ zVUEF^#7_xX`kS{O<8l39(cg_}BNF#_N#-rKg4zAT3i|1ydLBSMJ$P-&zZ+z!hsPDK zC+_DmY9~mRz@C-4AW``8S`fQ{AEBNMybSIjS}DyFv-Ei)|K6dtYy=c0iqgyq+!yit zVsr$J#|fBomqU(pFqh3wG^j6Sk6P3s5Jlm#}CHnNC4<|k! zwS#X9_*4_$%_cso^C|H4fbVYNgMS`K#q_s64Ui= zzs}BFV=E}>*E`gXAoOp8{u0vPXU{B~G9UJ!L*Rc`X0a`MHoqmVbJsZ!ob$jr51jMB zIS-ukz&Q_`^T0U|ob$jr5B$?TAiw7*zt<=$l;3B>s~CRKcQNUbIDOZWE*xs&*M|Zh zCUBWoN`CiD=8=)#d6M6^yhQNJ_hbBB5NPsym-Ie_E?NHZS24U5X6%$O1do3F$`#Ld z@)>nG-YL+0h`K|giR?Gfv{M> zY5`XZxL&|V1l%m((*o`l@PL4C3wT<7s(0Ri6@@U(z~MPOej;5Y%N3Ro)OVgaiKTrJ>w0Ur@?vw%+vIIF1W1|=7n_{J-f z^9u5&D3kIXQ}U-yx=zWRk3Tl3-k?sucJg?}@&aq?gWeT@!6qHn$UFR%%9ppYp*e3w zb8TJKwY61@V>RAD4a=)q+W<)&2AlMf)gmPUXA_7AWk{9bB>{2QSQlh@JjX{Kr9@$Q zJfBTolb?SVKF?RG&fs`gq-nF`fyuZB?Y$QtcKnu6YTVGoVqx{g0 z|MGa1_>MixtMu3R{iP*g_?kC4VWXHw zE}>ip zNS@Xbt9%P+$aPObG2MVezo;(b6#!zC&lds`(%;jJmt7zd-UHky-zCPsgxPYz zwa7`G?G|~tPD?1)FUfDU|9&AqNiamF=r?{!Q4GXF*ETomAH<^T8n zC*>tPiXvnCOGPQqT$>0a8ue4fHNm(1gUZIXuSDAP*?0&of+sv^IS z#P6j11Pfi7BjNcdmvtLi+eQ7K8i}}4cv16G%`>&7`Z-~>1k|v0%OuJ!^-u<%%9HWI$@>{W^(>F z(}%?8cQZYM$@$t$Pmo6s3~89j<@{>q&y44tHq-m(!KCsuHk`?M&dhI%=V><6vzVNh z%yfG^|FoHYe*8JeOuvB1dBRK|!Q}Wi(?=p-wPGak`!qI+$?<9|L{^>n^N^W-5tHM~ zOqbEvN2RzP#NMZG^qQ_=z*3O;&}u&|Q>oa=$aPB!(f)|&r^LD@=}tknKDXWsdJ3M> zq@8lSRdafOe};t~`5X+CUq8YBDfptbTn_}9ZNp8Xb|q`?dt6Tz%OHad*Jrp-p2WQlKV_PgjfKiO~1L%LMZt@$_qHHn_BpxYDp?LUQ{O*wki z{A>trzW|;5&?D9zIX@4@JeVkduAS5Sna}792HD?32efd>vqn=uC;iquvxQ0YR3@Rf zCZYd`(=(Yh|LM!1Q@z$as;?)}qvr>Qo?PAj$92H2#l|5~e-&~%^7B$=4_woAz?LQ9 zU!8e%LL7Zq3)+#py;%^!t|tzcsJ!h1kTAotrFimkQ z3jVE@IQ&5p{@)~_?-6>e`KgEFc{EYI-vOQS6kGES)4BUZ{4+Qmd7NoegiGcHz87@z z!&ZwQLW16Dq5oXyvF2I+HRy@*CjU|JTk}fC1l^h^+KCB)>}j>sdo$>X+I4#pdaKaW zWzq8h=Z7CA>wcj9JYD;e=s66!!a2&*yZ^1Cw<99ip0+B>WEu zerw+9pCsY`KcFjS(>eA+68>XJ=;_!Oiy>jsb_M7cB=}u~t2jNAS@U;K6Lf1n^cGGx zspiN=!EeoD{u=1?EYa_(YzhW~I0l|qi42itMe{vw*Mfye0Nt1M*%Pey;BFA5KNg+gCc@Szh8PnNv&|u6b+aeO18k(NB^yPw_2toaL?} za3JBeX9oCKEgWXU2q|F=@F?rEO0968wi zO(LDY$>}27zx5{!j7gja*lJV8=mF|5q!>^9n+-fGw%J*j4D^N@W&(%lw+#kr45iPe z`N>>gqMx0JPsQ|?L6aT)_XW%f#P2egi37B;Uv78(M;I)+`432lA6zj1XoQ)OFXM7A zX8KVMEMGjN%>3p>N^iSy!{IUgV24>&|5XG&^P7GoVt|g2DdEi~(KKPMc7W=Q$^7-7 IyD+Q$zk=>U9RL6T literal 25003 zcmeHv3v^UfmiA4GNJ*d)5i}?a7c4OJ!jK@)LBOuc18#*1L;?~;y9lWY2_&hQszA^T z216Wf7iCB7?xDNYq3y@dw2x+Hx_cNLH9Vt^Z8NAe`v0x?!dnIed_cMtYrcIRm0Ojn z8IAp~fBkFq$*S6C@3YT7`|PvNKIh(?I(L`47pAABDKc4<8x_LTt2xCX3U-x<41hx^ zR4&2ycx4>p%@`;el-oL=lO%*>MxZt>p`3%hkQ^!l5^l3_R2d_nsFx0u>g78)Jzvz5 za3={ylF-UgWf&lRx+vO{mBEC(8dPVAhkm zOx<6!WQCIXz(ZT}vv<5!<-geU*vY^9KM%Zr)vrj_6!D?B^S$=hR%sq^Xbf z%$=0>XhyotkrB?mW!$Ci`QOoqpVCx(uEl2>KHtD+IzHFoLn#lR87#-Y3Pk>Tz*!&4Z~;Cy3fuu$j86$Z3-O`k!lw)$H$LU~P+IXXBdS_onsvpI%>I|Jy>ID< zS(oj*=YyBe*naxan}6&4yycy{UwY@7d+vL^>h`YFpKXc$u6Ano zq>U9%9D4kty)SCdM}qe~l=<<-TkhQS@N3Q91z~s7)Bkwvma}ht^4c9$<2&*eu7B#C z2il+9y{_+zR~~>{NMj>9bWXu9Ey|_Jj-2?iY4Eer@g#;Vf&(X`|0@PE z8J*l!O(KWJGa3E86!K@MptoT1CTn+p3jNxp2syI9 z?}3f@n}t5N^J*++6aB1M-=&_fi+=62bH=dX?_mC)sqCJ>359~b8SPWMM-tk-6m*iG zE%cP*^|X**A>>QD{g(M{rqVN+3)Y1EkA)pN#Nu>R(DQ{pM?}9~5%hzi-Lonuv^ztbuuTA|554?{Zd80`g>N`|0-UrECnR}jRgItLmttU5|wk3 zBP0Ddg&jsq`%mJ0t2t0gM88%GKkpIr79sz+gm#}7^ECTf&bUqJKUvW8r*Qfsg8p|v z$v*jF9EH0p4UB*Ada8IlYa9JdocswrFDP=BXS@sw@wEc30c z4+MQJ#SPVgfG?l~>stI9m$fzp>l=NgZ8g5;V71)%kL67qOIw3W zYs-C&{+3OpEiL{QrL1~Gwa4GG)>GYFT~p`tRM!OSH~1Rw@IY5zV|~E0uAzQSb-mx? zW}w*L6!f(PJ!^fzCDp`vv#%vUS`AT7qpvaG3l_E3*7{l$jto^)ZGA(7(h%_Z)+_!d zU%78>wGcZ*)oNROJ|Tg!LshH6U$b7RW#2Uo{(!HfwQ-GB-Q;b6;S?{Bl`Zu_pP(>8 zMRl-FX>Dq#Z(6UEEpd64Ho5)PUZvR|sBiPYMsS$5O-i7-rM@Xxt2C~M%QQACfz~yR z^+7(=J{sC`U$D;aRlLj?EXzky3?b)?Bb9(v2?V{UuG9+gwHrx2#V6gb+!qK`uO+q? z)(T}Bd`$|pLLUjMY2L&@b#RbVGM8K8AF3&9SsT?*r#7&usRmOBDm3{$Xc)dYbSH!} z7tsJ!vWe(v!$b`ZRY$X|g(Xn2_UnpcS`E>M{mmGfA=YKynmh)5&2en>lC++{zU%Jx8_EvZ&1r3AwZ(|2DO!*H%yMy5t{SXoQohkvwE4Hc(SBg{yiC?}4B7)UKmwSGqXPfBe zmn%6Y`W#{uNj1^sdnqc)HPIJIX25(CU2L6MX`zWu@4#fL5_B><_cdO!+C;ZVX23cV z9b4sCYBtg7-I`2oCb}3}R@7ml%V!WO*k+>3y)n_dO>{U)EbTDSN5rUj4{D-kn&`Vs z^pPg|ZWDc!iN41~H;-43iJoQRf6hb~k07k{h>31~FLB&NA8nFz!bHE^L^n+I*eha` zpEc3Pn)v%o^d%;`BA#PtOysjL6qOUg5#UmmstuxUl zoA{ee^lMD?HWPimiQZwNi$`cys&$^S8h58FigA?%Uq6TKw9bRp1JT(JOjDKUl<(m? zYoY@=!t1C`)I8DKb z%J6RxPE#+E&G5;D)0B%S48MwSnre}=Uw|-WG~qPGA}1I=f^eEzks}OGC!D5Kq=(^u z&IL|WDYBd49}!MdD6*5`9}rGcC(_OEw+W}I5$Ry~>x9#kh%__&DB&~}BC8qx0^#K9 zkqUl&?d!5;7`!I4^cL9bnf|3YgmJmE3q-Qg(@ z0YKh$sNZ8Qy^3S zNDDcdwNOFyL~UM79xFeVvb>T6=$~of*|)+cq80gC=yFYeQ_~Nx(n4j;T6pa1@QvtO zQr;e#IR{Mg_1o-cHNC;!56k9oVfybh{aCa+ACrn~JYK7LRSVr_hus_O*#+k{y%#pm ziCp%1G}?`Zx1a%Pl1e!rl|G2U%9^M+SGx4yJ8yQbaEJd91jV@=mbqA|zl;hrlwHre zvs{saqXp+(`%aHBPD5sC*Xhj_r3LT0L$ht@ci$+om$UO=TIt;PHhvq>ntlelG-INyy4`u3^LFRAou0@epGBh>niBk%*ibnUOZgkmEr{c!tJA|AWy3_O70+@AVKDxfJvk{{F{nR3((% zb+k2$m&N>}Alggf72~IWhA~15-UTCT_h7Gyx7u4@So%_}(1ij5rB)PzmcRCw$p>Q;F6M1X=G zs`VNb18zZgw9weeVC(A9)3dhi#wwtN+Tlpy*_l);G@Kk3Eo^NfWmu7ZSPNYleP9>* zgvy5|tK@?ks&Bo;RQ-JfOjGq4DXKqWs(zKJ`n_-yQ9U$apJcLRZ9PfBVXz)d=dlk{ zqq;>4k7{SR!|u>J!Jr=z~l-eLtN1Q6}i9re`Atq$yclQ8;?!Q#MT< zPz_Ry&3}qUnPWuWj?puINEO4gzd<9WzpjM}3wn&P7-N$MndU<%qn`0E*b8O$($?Y` z*az~hMo&!oR!Hamt)F%4UVFbw|9oHnL=C~npkQ>CLd>b0f(z4njlKQRs9S$u(+?Rl zkfnL9A295c4Yym;j2A9Y`AHBFn2hOYL(@Mb|GS@YAE&s4&^6xf4tWuv5StE|=Wl%c z(=TfJ1^t-ON@7BzzOAC`5F9E6K)dLDLA0V7(i5Ux7nI=hmE`F!vxr)7lyw3D^CHyw zXQ4{^UT~X5Q1~OoA|IlF+r_vB?4fMfy%2i`G;XA$tZ)B-i8TFHW;$)lDLz?- zqdlwy`iF?h8C2(OIKi`Ie3yREu%J4c|CScAxT%Cp6z#46W2nSlsD&P~ce7!7+`a?Q z9r~GlC*fNTlJJ4`%?Lpb)9WehamdW;&I?!Q+e`6AKL#aOt5A%I)ZaGd@Ww~6rnVd; z@~Q)dLyBq^q9(>fy)NTpa(O4gh|btF2+W6$Lf{#YnQjw>z?VM7F!uIPsOvi(8=n(2 zI`A`QKlmSC{Bkiho%)OMMl9X>u1h}@ya}$g_qQdoE{Y}>WC8)A0@}J=8CVw%l@6x}&59#pl+6gLTe+mEoIb<>W ztr;q5)m9)8V>aRMN zyYx?JZ=wI85^CkrT9;!?V47rgjl$TK*IXpQd#lusY|yx_Q>K4zTLW5f^JYJECj|XzS}VAD2Ni7CemUzy{$!_p==&evafn z5PU@ox8=Y9l_1et`mZS0!go19atPATKyvF*ZQuJD#^)cAotE2kG`-kLVGW_jtq1DR zVu`-Kxwkk7W{lb8y~S;m>n(045bb6$KrhBJPqFF^@bB$l1;WZ^Ey~?PQYDc}R_E99)k9r@^iQABkb(>$Y3nq1`mV#&c)H zj_0s5dW?p$wBURQg47ansqd*GKCLO##Ph4nLK zq2+~uW9-IL;ETEE;yNuf#;(gppp=IDmR3bA$GgcRx-#mH;KC zwgXb1PlE@*7mP_@?R#0oS2x0G4xZh$@In@2 zH+-9FJOj(dI&X@fZhcl<{3PC^;6*S-1E=x4j0WI*AND2DrUT{|(^GVXHQsk0i#Orf z5Ay29xxrmw>lMa)IDcv$cwDxk&pB)#63Vz~3}S3yS2Q>-_V7KG^aw3jixQ)ZV*dt$fV&RzsnLA!GOa0%Aek(&NeEV|=CB5UjYY!1|6ew;Bb zqIXWLDecaAC3eiv-=RbBT^GVcofGl9oeIS?Wi0N{{1>R8t^HDGF93}d3Hk(r%DbXj zTOVX5vC#6n8V<*sW^-24_hTJpQ`V)g%yH`eF<{VYu%eag8*^RyB9(TSs949hL$1(# zJDbTaeK?!&)V&;I+6QcA9%?J5O**1agFT-%>4mgOcaX1WZj4X`hP>(k=~hKU7@mC- z{D!80D+0ehn9{GQ#z{0mJr6oN8|>}e96VNWBf!1M2tjTHwxy$LXHZQi56>LNnu3Rq zL}QWV7}?l91$$^%h3!CSPW63PuHR2txPIdxB{GRsaX{hQ>|b-de_|T%QKDmVy#F}V z@g4?2Y`lN^6~`OVdJ&@aS0DD7h8TADml*cg`pL#0V?g822gPvV-C^HT|NZ!*t)YxR zs`2rs1yFyJ#vhGE7Js?%$2gIHj)Rm)4yw=_W_k}zn>56mjGd!YY%Kc`xbXh_K_K)T z(Aj6jJ9Nw&6ttd$&s$f~yJ@Oo{0fqYazIhm;h4ybhU%uZGX|ZuE3R&8@&{Eqq*Bk$ z{i!&$=2MomHZ-Wsew^pRDYzLyoaUMmm>~|}mAS>)Fi#arCh>!g2C|mx{WHwx0CQdL z0fj1qI5f6)-n5%>M2;QeQF4)|BVtOizqP@ul6K8FJ41)LaL|gYC$#qBL`X|xeG{ju z0i2e)K@nwi(1 zIEC1P(=EOhrtkoVSG2yd4rW#P36dbG;aYlwirK@Gt7!vl?^XHXBegZ)^G;X!`4F|m zcSmb|i!Y$^(~W|qz6odSs=cbeRxQKeszsZE@@OC=4p=VM!C;+FEl}wUqHLws-!jk> zX_x@}cXth0p6pWt!^7r*pud^fO|9eS1l1{w5vMd~@Kc&IoV@dmlW@FA;#3CTL<<5F_)mcTcCP5#!kb!q^IRei9h95&`KTpG@)Ty2?m3dneT zj=vX;(tF*0Kov0OWHeexlOM1OunMpla1Y=Pz#WJo{eZ^-a}Zhj0rLS1F}*7Qk0Ta$ z0Ma_Q6L1$`51qIlveT{} zIo#TfJ`kO3@Z7u6DE$*uCZf;6=STvbc-{CM2R#>1aoVyw({HuWLjWbJ*MN_Ka?v!A z6+1p#@i_td?L=5&Q}0dhv^0#cSr=wj#p>UNqQ~*c!C1UPd_}hG@1~d9a_+Mf+0^hb zmo0aDhSQdR?R*CCfw!+duWYT{mjUnV0 zlKc`|_6q2*G;=Z2f%Kn_@;xZON|vL9>Xm@r1Ns6%zX|mg3*D!K=0*7ll>Z20%3XIVv-c+`SFzSUOy%Vy+HKpNwmm)lcJP#u7JOXBL*E^= zCKG{;=S{YqX)C$5&H=s>@b!R?=@a7<9W?PZfX@J556LUPcUWh}5?g*aeY+*yhlVvK zko6yY9hfUKh>yuz%F36?{-eAb<*dDg{*!OMLi(UQEb8BzUS`Ybv@Ejarx%T|873jQ|A&5-8`F~Nu`shT znHrE!iP{-64E9ItVZXsim}dj+IJq5~Eq_ejPHE<?Ag%z}`*XD4hWAiMiB!U-q_re7{hw$m(Yzgxm|2Ke%6^kl ze(xon!*8{%{%(4B*nO7m85S$lu?p|z{BH2~8O7d*zzX*IJC&zvtjsM!PB!5@==@(J#vaz2K zf)F}b+9eLL$~{spVm>8#wzXf#=c_N)D_*P!HpNk-G76SJmbyg&?O!N~>hw!FmaJnr zl=PiAO+ZQR+vuGxCA@27Nw$Z7Ix+nnf*stVF8Z`cv8Ug z0$##4uQ>B7;8X$U2)Izdn+2>DaD#yN3ity7e<9#+1w1U^F#%5scwWFut`_|la8Ysb z4QeiKe3+)r%A1wXbb1rY6+nFxbLNHi+LIGB$yDP=?>8R8s|!Yi<)#FV_6pdY``%ky%ut_Fovfuy4@ z)1IWOtij*^6Sj>f*RC5lUVz;K4a zFYC*AB_TckQj+mU#tlxz{s|e1RkFT}XA*XZz(eg4kE}1Ii{b!{WXt+8K1wL#Ay`;S zj4v9f6q{sy8INL6)RX#4J_%Q#oMN5CWxSJcwWyz{zaX>yv@9cP_$u2^?0*yL znCr`UEuoC>RG-#}#QJTZkxk@!BJb;vP}&(REKytL6aZqbzgtw0uvr>LRFd^^wuGh4 z_%_#<_v=W=PE-qC0A* zGoY-NigHxpr2+vsi4QfAKR{wRS)ZO$%}J^wA^jjiwr%G7wP^p0nTWisC&`3M-9_?be~G%{V}vJrTbt`B-pJ}(2WH851AaP-2i+d1^szO z&r}lcv#}vqjvlBU%S~eR!S^K11YM1B4P=}t=qrSr#Cv7_9(3yO*`ZpDx(j+cj`vaNzdb=U8!e<;7`1Fhu(?P z_$J;9^?OE_+6}-nLjH3J@;?)DdM0yTO;mBB=3C z^VA2rP;sDB$PLFnk?fFo|5Gue$FVZx7Qw&M&db>ULXiE5p!ZDT^wk_F^y~d(cDNw; zRiPC7pAqWJ!=@}5{}Rw?e2*m9|CSW|4JqjNq@dHS0c3~7d$x87dg6U)r&7qFe?pR+ z#Cyd?;CYnv-;6GDFz^9HR@w1cqaRB(t-c zf_@{TqhCV4*rq9~Ku>lb*xD3w+J&6N`_djr!T$@vpLh?}+k##pHn8$OuqoIWlAV4)E2TWpNzcT4$gT%HnH?%p&}*4|F4O#dC!=4f^dyY$y-W`7hnv8;#p^0%e+v1> zQqa$Xp3Ke{Q}B<*{x?~_W~ZQQDd_7M9r~+WfA+s)WbaDBA7=C<`%gMVVY+fmsc8uY zf;eoGM^`2lFJHRM<91apr<=$;C2?3;v8l+58*3W;YpNSO>=l@&y0uNgd#z@=7{;47 z=lWT56)N!5d)t7`E>QSeG+b%bxJkBCx}=0Pvv8TSywoExYDVSQf3_%RyUThVgKElZr`uHvtEw+y?Th94b@yH+dt z9c}!6GlxUo7eklC#NX-_Uq)w*hHy7q{M|k58mD-<>|QzZ8L`9*>XO_=H$+|Hu4qcQ zkdEJjH|Pp9ev??d#zSl!H!iW;v-o9d@gftAAn&w_8%=g2+7KK6I(KiGZ*Ypg8H(Ox z$9KeMy6kO;swP}oM%UfNw=s0N@wWuxRX;9^iivIAZrM6~tdH6@Q6h zyoe#{%&rkc^Zf8&eA7dlkAJD_&IYc(6LireEU8Vi_FG7o~}dC%;5(aVumz=Ma0$F4G%gT|TAxeQ>nwu!}a? zeQ&XFBd)bG*$ev`d8~J!1Gwpp-;+5&qQ#syz;D(ZqK@p|KYr8#Z`9V m-p=^4_`2efzIYaP&1XC*z81fdFrH)3t)R5##k26+7yk>;BoxvB diff --git a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c index 74545fc508..3f7c21364f 100644 --- a/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c +++ b/artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.c @@ -536,7 +536,7 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_su } JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll - (JNIEnv * env, jobject thisObject, jobject contextPointer) { + (JNIEnv * env, jobject thisObject, jobject contextPointer, jboolean useFdatasync) { #ifdef DEBUG fprintf (stdout, "Running blockedPoll\n"); @@ -553,6 +553,8 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl short running = 1; + int lastFile = -1; + while (running) { int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0); @@ -574,6 +576,8 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl fflush(stdout); #endif + lastFile = -1; + for (i = 0; i < result; i++) { #ifdef DEBUG @@ -593,6 +597,11 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl break; } + if (useFdatasync && lastFile != iocbp->aio_fildes) { + lastFile = iocbp->aio_fildes; + fdatasync(lastFile); + } + int eventResult = (int)event->res; diff --git a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java index 8049a97529..cdaea55963 100644 --- a/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java +++ b/artemis-native/src/main/java/org/apache/activemq/artemis/jlibaio/LibaioContext.java @@ -49,7 +49,7 @@ public class LibaioContext implements Closeable { *
* Or else the native module won't be loaded because of version mismatches */ - private static final int EXPECTED_NATIVE_VERSION = 6; + private static final int EXPECTED_NATIVE_VERSION = 7; private static boolean loaded = false; @@ -146,6 +146,8 @@ public class LibaioContext implements Closeable { final int queueSize; + final boolean useFdatasync; + /** * The queue size here will use resources defined on the kernel parameter * fs.aio-max-nr . @@ -153,11 +155,13 @@ public class LibaioContext implements Closeable { * @param queueSize the size to be initialize on libaio * io_queue_init which can't be higher than /proc/sys/fs/aio-max-nr. * @param useSemaphore should block on a semaphore avoiding using more submits than what's available. + * @param useFdatasync should use fdatasync before calling callbacks. */ - public LibaioContext(int queueSize, boolean useSemaphore) { + public LibaioContext(int queueSize, boolean useSemaphore, boolean useFdatasync) { try { contexts.incrementAndGet(); this.ioContext = newContext(queueSize); + this.useFdatasync = useFdatasync; } catch (Exception e) { throw e; } @@ -349,7 +353,7 @@ public class LibaioContext implements Closeable { */ public void poll() { if (!closed.get()) { - blockedPoll(ioContext); + blockedPoll(ioContext, useFdatasync); } } @@ -436,7 +440,7 @@ public class LibaioContext implements Closeable { /** * This method will block as long as the context is open. */ - native void blockedPoll(ByteBuffer libaioContext); + native void blockedPoll(ByteBuffer libaioContext, boolean useFdatasync); static native int getNativeVersion(); diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java index 7f98f0d965..10139660da 100644 --- a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java +++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/LibaioTest.java @@ -54,7 +54,7 @@ public class LibaioTest { parent.mkdirs(); boolean failed = false; - try (LibaioContext control = new LibaioContext<>(1, true); LibaioFile fileDescriptor = control.openFile(file, true)) { + try (LibaioContext control = new LibaioContext<>(1, true, true); LibaioFile fileDescriptor = control.openFile(file, true)) { fileDescriptor.fallocate(4 * 1024); } catch (Exception e) { e.printStackTrace(); @@ -80,7 +80,7 @@ public class LibaioTest { @Before public void setUpFactory() { - control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true); + control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true, true); } @After @@ -532,10 +532,10 @@ public class LibaioTest { boolean exceptionThrown = false; control.close(); - control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false); + control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false, true); try { // There is no space for a queue this huge, the native layer should throw the exception - LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false); + LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false, true); } catch (RuntimeException e) { exceptionThrown = true; } @@ -630,7 +630,7 @@ public class LibaioTest { @Test public void testBlockedCallback() throws Exception { - final LibaioContext blockedContext = new LibaioContext(500, true); + final LibaioContext blockedContext = new LibaioContext(500, true, true); Thread t = new Thread() { @Override public void run() { diff --git a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java index c04bff447b..b515663e18 100644 --- a/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java +++ b/artemis-native/src/test/java/org/apache/activemq/artemis/jlibaio/test/OpenCloseContextTest.java @@ -53,7 +53,7 @@ public class OpenCloseContextTest { for (int i = 0; i < 10; i++) { System.out.println("#test " + i); - final LibaioContext control = new LibaioContext<>(5, true); + final LibaioContext control = new LibaioContext<>(5, true, true); Thread t = new Thread() { @Override public void run() { @@ -111,7 +111,7 @@ public class OpenCloseContextTest { for (int i = 0; i < 10; i++) { System.out.println("#test " + i); - final LibaioContext control = new LibaioContext<>(5, true); + final LibaioContext control = new LibaioContext<>(5, true, true); Thread t = new Thread() { @Override public void run() { @@ -164,9 +164,9 @@ public class OpenCloseContextTest { @Test public void testCloseAndStart() throws Exception { - final LibaioContext control2 = new LibaioContext<>(5, true); + final LibaioContext control2 = new LibaioContext<>(5, true, true); - final LibaioContext control = new LibaioContext<>(5, true); + final LibaioContext control = new LibaioContext<>(5, true, true); control.close(); control.poll(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index 4ced546ce6..24c625c028 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -177,7 +177,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { } public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) { - return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor); + return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext()); } public void sendSASLSupported() { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 66c7b4bab8..acbb2e9d34 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; @@ -81,6 +82,8 @@ public class AMQPSessionCallback implements SessionCallback { private ServerSession serverSession; + private final OperationContext operationContext; + private AMQPSessionContext protonSession; private final Executor closeExecutor; @@ -91,12 +94,14 @@ public class AMQPSessionCallback implements SessionCallback { ProtonProtocolManager manager, AMQPConnectionContext connection, Connection transportConnection, - Executor executor) { + Executor executor, + OperationContext operationContext) { this.protonSPI = protonSPI; this.manager = manager; this.connection = connection; this.transportConnection = transportConnection; this.closeExecutor = executor; + this.operationContext = operationContext; } @Override @@ -151,7 +156,7 @@ public class AMQPSessionCallback implements SessionCallback { false, // boolean autoCommitAcks, false, // boolean preAcknowledge, true, //boolean xa, - (String) null, this, true); + (String) null, this, true, operationContext); } @Override diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index 3a1f4479e4..ce656480dd 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -96,7 +96,8 @@ public class MQTTConnectionManager { String id = UUIDGenerator.getInstance().generateStringUUID(); ActiveMQServer server = session.getServer(); - ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null, session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE); + ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null, + session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE, server.newOperationContext()); return (ServerSessionImpl) serverSession; } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 33418e6c71..8dc0b348c7 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -119,12 +120,15 @@ import org.apache.activemq.state.SessionState; import org.apache.activemq.transport.TransmitCallback; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.wireformat.WireFormat; +import org.jboss.logging.Logger; /** * Represents an activemq connection. */ public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth { + private static final Logger logger = Logger.getLogger(OpenWireConnection.class); + private static final KeepAliveInfo PING = new KeepAliveInfo(); private final OpenWireProtocolManager protocolManager; @@ -139,17 +143,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private final AtomicBoolean stopping = new AtomicBoolean(false); - private boolean inServiceException; - - private final AtomicBoolean asyncException = new AtomicBoolean(false); - - // Clebert: Artemis session has meta-data support, perhaps we could reuse it here private final Map sessionIdMap = new ConcurrentHashMap<>(); private final Map consumerExchanges = new ConcurrentHashMap<>(); private final Map producerExchanges = new ConcurrentHashMap<>(); - // Clebert TODO: Artemis already stores the Session. Why do we need a different one here private final Map sessions = new ConcurrentHashMap<>(); private ConnectionState state; @@ -172,6 +170,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se */ private ServerSession internalSession; + private final OperationContext operationContext; + private volatile long lastSent = -1; private ConnectionEntry connectionEntry; private boolean useKeepAlive; @@ -185,6 +185,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se OpenWireFormat wf) { super(connection, executor); this.server = server; + this.operationContext = server.newOperationContext(); this.protocolManager = openWireProtocolManager; this.wireFormat = wf; this.useKeepAlive = openWireProtocolManager.isUseKeepAlive(); @@ -201,6 +202,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return info.getUserName(); } + + public OperationContext getOperationContext() { + return operationContext; + } + // SecurityAuth implementation @Override public RemotingConnection getRemotingConnection() { @@ -239,6 +245,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se super.bufferReceived(connectionID, buffer); try { + recoverOperationContext(); + Command command = (Command) wireFormat.unmarshal(buffer); boolean responseRequired = command.isResponseRequired(); @@ -285,17 +293,38 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } - // TODO: response through operation-context - - if (response != null && !protocolManager.isStopping()) { - response.setCorrelationId(commandId); - dispatchSync(response); - } + sendAsyncResponse(commandId, response); } } catch (Exception e) { ActiveMQServerLogger.LOGGER.debug(e); sendException(e); + } finally { + clearupOperationContext(); + } + } + + /** It will send the response through the operation context, as soon as everything is confirmed on disk */ + private void sendAsyncResponse(final int commandId, final Response response) throws Exception { + if (response != null) { + operationContext.executeOnCompletion(new IOCallback() { + @Override + public void done() { + if (!protocolManager.isStopping()) { + try { + response.setCorrelationId(commandId); + dispatchSync(response); + } catch (Exception e) { + sendException(e); + } + } + } + + @Override + public void onError(int errorCode, String errorMessage) { + sendException(new IOException(errorCode + "-" + errorMessage)); + } + }); } } @@ -626,7 +655,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } private void createInternalSession(ConnectionInfo info) throws Exception { - internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true); + internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext); } //raise the refCount of context @@ -1083,7 +1112,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processBeginTransaction(TransactionInfo info) throws Exception { final TransactionId txID = info.getTransactionId(); - setOperationContext(null); try { internalSession.resetTX(null); if (txID.isXATransaction()) { @@ -1101,7 +1129,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } finally { internalSession.resetTX(null); - clearOpeartionContext(); } return null; } @@ -1118,12 +1145,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se AMQSession session = (AMQSession) tx.getProtocolData(); - setOperationContext(session); - try { - tx.commit(onePhase); - } finally { - clearOpeartionContext(); - } + tx.commit(onePhase); return null; } @@ -1137,21 +1159,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processForgetTransaction(TransactionInfo info) throws Exception { TransactionId txID = info.getTransactionId(); - setOperationContext(null); - try { - if (txID.isXATransaction()) { - try { - Xid xid = OpenWireUtil.toXID(info.getTransactionId()); - internalSession.xaForget(xid); - } catch (Exception e) { - e.printStackTrace(); - throw e; - } - } else { - txMap.remove(txID); + if (txID.isXATransaction()) { + try { + Xid xid = OpenWireUtil.toXID(info.getTransactionId()); + internalSession.xaForget(xid); + } catch (Exception e) { + e.printStackTrace(); + throw e; } - } finally { - clearOpeartionContext(); + } else { + txMap.remove(txID); } return null; @@ -1161,7 +1178,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processPrepareTransaction(TransactionInfo info) throws Exception { TransactionId txID = info.getTransactionId(); - setOperationContext(null); try { if (txID.isXATransaction()) { try { @@ -1177,7 +1193,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } finally { internalSession.resetTX(null); - clearOpeartionContext(); } return new IntegerResponse(XAResource.XA_RDONLY); @@ -1187,7 +1202,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processEndTransaction(TransactionInfo info) throws Exception { TransactionId txID = info.getTransactionId(); - setOperationContext(null); if (txID.isXATransaction()) { try { Transaction tx = lookupTX(txID, null); @@ -1204,7 +1218,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } else { txMap.remove(txID); - clearOpeartionContext(); } return null; @@ -1267,13 +1280,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se Transaction tx = lookupTX(messageSend.getTransactionId(), session); - setOperationContext(session); session.getCoreSession().resetTX(tx); try { session.send(producerInfo, messageSend, sendProducerAck); } finally { session.getCoreSession().resetTX(null); - clearOpeartionContext(); } return null; @@ -1283,7 +1294,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processMessageAck(MessageAck ack) throws Exception { AMQSession session = getSession(ack.getConsumerId().getParentId()); Transaction tx = lookupTX(ack.getTransactionId(), session); - setOperationContext(session); session.getCoreSession().resetTX(tx); try { @@ -1291,7 +1301,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se consumerBrokerExchange.acknowledge(ack); } finally { session.getCoreSession().resetTX(null); - clearOpeartionContext(); } return null; } @@ -1367,17 +1376,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } - private void setOperationContext(AMQSession session) { - OperationContext ctx; - if (session == null) { - ctx = this.internalSession.getSessionContext(); - } else { - ctx = session.getCoreSession().getSessionContext(); - } - server.getStorageManager().setContext(ctx); + private void recoverOperationContext() { + server.getStorageManager().setContext(this.operationContext); } - private void clearOpeartionContext() { + private void clearupOperationContext() { server.getStorageManager().clearContext(); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 714a29aa36..426f4e64ef 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -107,7 +107,7 @@ public class AMQSession implements SessionCallback { // now try { - coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true); + coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext()); long sessionId = sessInfo.getSessionId().getValue(); if (sessionId == -1) { @@ -290,8 +290,6 @@ public class AMQSession implements SessionCallback { } else { final Connection transportConnection = connection.getTransportConnection(); - // new Exception("Setting to false").printStackTrace(); - if (transportConnection == null) { // I don't think this could happen, but just in case, avoiding races runnable = null; diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 46f8e4ce4c..6029b3785b 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -230,7 +230,7 @@ public class StompProtocolManager extends AbstractProtocolManagerfdatasync + * + * @return a boolean + */ + boolean isJournalDatasync(); + + /** + * documented at {@link #isJournalDatasync()} ()} + * + * @param enable + * @return this + */ + Configuration setJournalDatasync(boolean enable); + /** * @return usernames mapped to ResourceLimitSettings */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 261392923a..7e851a90a7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -77,6 +77,8 @@ public class ConfigurationImpl implements Configuration, Serializable { private boolean persistenceEnabled = ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled(); + private boolean journalDatasync = ActiveMQDefaultConfiguration.isDefaultJournalDatasync(); + protected long fileDeploymentScanPeriod = ActiveMQDefaultConfiguration.getDefaultFileDeployerScanPeriod(); private boolean persistDeliveryCountBeforeDelivery = ActiveMQDefaultConfiguration.isDefaultPersistDeliveryCountBeforeDelivery(); @@ -297,6 +299,17 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } + @Override + public boolean isJournalDatasync() { + return journalDatasync; + } + + @Override + public ConfigurationImpl setJournalDatasync(boolean enable) { + journalDatasync = enable; + return this; + } + @Override public long getFileDeployerScanPeriod() { return fileDeploymentScanPeriod; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index d90c3437bb..02a9e94d88 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -486,6 +486,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { } } + config.setJournalDatasync(getBoolean(e, "journal-datasync", config.isJournalDatasync())); + config.setJournalSyncTransactional(getBoolean(e, "journal-sync-transactional", config.isJournalSyncTransactional())); config.setJournalSyncNonTransactional(getBoolean(e, "journal-sync-non-transactional", config.isJournalSyncNonTransactional())); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 24650e1473..c0ef93eca2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -118,6 +118,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO()); + bindingsFF.setDatasync(config.isJournalDatasync()); Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1, 0); @@ -135,6 +136,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager { throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType()); } + journalFF.setDatasync(config.isJournalDatasync()); + + Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO(), 0); messageJournal = localMessage; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index 149c0112eb..64e496aab6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; @@ -150,7 +151,9 @@ public class ActiveMQPacketHandler implements ChannelHandler { activeMQPrincipal = connection.getDefaultActiveMQPrincipal(); } - ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true); + OperationContext sessionOperationContext = server.newOperationContext(); + + ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel); channel.setHandler(handler); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index a43fec8ea4..aa1cb347d1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl; import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.remoting.server.RemotingService; @@ -180,7 +181,8 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean xa, String defaultAddress, SessionCallback callback, - boolean autoCreateQueues) throws Exception; + boolean autoCreateQueues, + OperationContext context) throws Exception; SecurityStore getSecurityStore(); @@ -192,6 +194,8 @@ public interface ActiveMQServer extends ActiveMQComponent { HierarchicalRepository getAddressSettingsRepository(); + OperationContext newOperationContext(); + int getConnectionCount(); long getTotalConnectionCount(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index d2de96480e..2cd9328cb6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -423,6 +423,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { return manager; } + @Override + public OperationContext newOperationContext() { + return getStorageManager().newContext(getExecutorFactory().getExecutor()); + } + @Override public final synchronized void start() throws Exception { if (state != SERVER_STATE.STOPPED) { @@ -1188,7 +1193,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean xa, final String defaultAddress, final SessionCallback callback, - final boolean autoCreateQueues) throws Exception { + final boolean autoCreateQueues, + final OperationContext context) throws Exception { String validatedUser = ""; if (securityStore != null) { @@ -1201,7 +1207,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { checkSessionLimit(validatedUser); - final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor()); final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues); sessions.put(name, session); diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 73aa20b0d2..a58c33fc91 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -46,6 +46,14 @@ + + + + that means the server will use fdatasync to confirm writes on the disk. + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index a2afd976d7..e8abcd5e65 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -354,6 +354,8 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(1234567, conf.getGlobalMaxSize()); assertEquals(37, conf.getMaxDiskUsage()); assertEquals(123, conf.getDiskScanPeriod()); + + assertEquals(false, conf.isJournalDatasync()); } @Test diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 29119f8d73..7f01767cb1 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -450,7 +450,7 @@ public abstract class ActiveMQTestBase extends Assert { * @throws Exception */ protected ConfigurationImpl createBasicConfig(final int serverID) { - ConfigurationImpl configuration = new ConfigurationImpl().setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(getJournalDir(serverID, false)).setBindingsDirectory(getBindingsDir(serverID, false)).setPagingDirectory(getPageDir(serverID, false)).setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD); + ConfigurationImpl configuration = new ConfigurationImpl().setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(getJournalDir(serverID, false)).setBindingsDirectory(getBindingsDir(serverID, false)).setPagingDirectory(getPageDir(serverID, false)).setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD).setJournalDatasync(false); return configuration; } diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 89844056a3..f1b1774f47 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -49,6 +49,7 @@ 10111213 8 127 + false true true 98765 diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md index c47861bb3e..65ef931906 100644 --- a/docs/user-manual/en/configuration-index.md +++ b/docs/user-manual/en/configuration-index.md @@ -62,6 +62,7 @@ Name | Description [journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true [journal-sync-transactional](persistence.md) | if true wait for transaction data to be synchronized to the journal before returning response to client. Default=true [journal-type](persistence.md) | the type of journal to use. Default=ASYNCIO +[journal-datasync](persistence.md) | It will use fsync on journal operations. Default=true. [large-messages-directory](large-messages.md "Configuring the server") | the directory to store large messages. Default=data/largemessages [management-address](management.md "Configuring Core Management") | the name of the management address to send management messages to. It is prefixed with "jms.queue" so that JMS clients can send messages to it. Default=jms.queue.activemq.management [management-notification-address](management.md "Configuring The Core Management Notification Address") | the name of the address that consumers bind to receive management notifications. Default=activemq.notifications diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md index cee06f4473..6f9c481241 100644 --- a/docs/user-manual/en/persistence.md +++ b/docs/user-manual/en/persistence.md @@ -298,6 +298,10 @@ The message journal is configured using the following attributes in data files on the journal The default for this parameter is `30` + +- `journal-datasync` (default: true) + + This will disable the use of fdatasync on journal writes. ### An important note on disabling disk write cache. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java new file mode 100644 index 0000000000..c4c221406b --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/SyncSendTest.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.persistence; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.io.File; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.jlibaio.LibaioContext; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class SyncSendTest extends ActiveMQTestBase { + + private static long totalRecordTime = -1; + private static final int RECORDS = 300; + private static final int MEASURE_RECORDS = 100; + private static final int WRAMP_UP = 100; + + @Parameterized.Parameters(name = "storage={0}, protocol={1}") + public static Collection getParameters() { + Object[] storages = new Object[]{"libaio", "nio", "null"}; + Object[] protocols = new Object[]{"core", "openwire", "amqp"}; + + ArrayList objects = new ArrayList<>(); + for (Object s : storages) { + if (s.equals("libaio") && !LibaioContext.isLoaded()) { + continue; + } + for (Object p : protocols) { + objects.add(new Object[]{s, p}); + } + } + + return objects; + } + + private final String storage; + private final String protocol; + + public SyncSendTest(String storage, String protocol) { + this.storage = storage; + this.protocol = protocol; + } + + ActiveMQServer server; + JMSServerManagerImpl jms; + + @Override + public void setUp() throws Exception { + super.setUp(); + + if (storage.equals("null")) { + server = createServer(false, true); + } else { + server = createServer(true, true); + } + + jms = new JMSServerManagerImpl(server); + + if (storage.equals("libaio")) { + server.getConfiguration().setJournalType(JournalType.ASYNCIO); + } else { + server.getConfiguration().setJournalType(JournalType.NIO); + + } + jms.start(); + } + + private long getTimePerSync() throws Exception { + + if (storage.equals("null")) { + return 0; + } + if (totalRecordTime < 0) { + File measureFile = temporaryFolder.newFile(); + + System.out.println("File::" + measureFile); + + RandomAccessFile rfile = new RandomAccessFile(measureFile, "rw"); + FileChannel channel = rfile.getChannel(); + + channel.position(0); + + ByteBuffer buffer = ByteBuffer.allocate(10); + buffer.put(new byte[10]); + buffer.position(0); + + Assert.assertEquals(10, channel.write(buffer)); + channel.force(true); + + long time = System.nanoTime(); + + for (int i = 0; i < MEASURE_RECORDS + WRAMP_UP; i++) { + if (i == WRAMP_UP) { + time = System.nanoTime(); + } + channel.position(0); + buffer.position(0); + buffer.putInt(i); + buffer.position(0); + Assert.assertEquals(10, channel.write(buffer)); + channel.force(false); + } + + long timeEnd = System.nanoTime(); + + totalRecordTime = ((timeEnd - time) / MEASURE_RECORDS) * RECORDS; + + System.out.println("total time = " + totalRecordTime); + + } + return totalRecordTime; + + } + + @Test + public void testSendConsumeAudoACK() throws Exception { + + long recordTime = getTimePerSync(); + + jms.createQueue(true, "queue", null, true, null); + + ConnectionFactory factory = newCF(); + + Connection connection = factory.createConnection(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue; + if (protocol.equals("amqp")) { + queue = session.createQueue("jms.queue.queue"); + } else { + queue = session.createQueue("queue"); + } + MessageProducer producer = session.createProducer(queue); + + long start = System.nanoTime(); + + for (int i = 0; i < (RECORDS + WRAMP_UP); i++) { + if (i == WRAMP_UP) { + start = System.nanoTime(); // wramp up + } + producer.send(session.createMessage()); + } + + long end = System.nanoTime(); + + System.out.println("end - start = " + (end - start) + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(end - start)); + System.out.println("RECORD TIME = " + recordTime + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(recordTime)); + + if ((end - start) < recordTime) { + Assert.fail("Messages are being sent too fast! Faster than the disk would be able to sync!"); + } + + connection.start(); + MessageConsumer consumer = session.createConsumer(queue); + + for (int i = 0; i < (RECORDS + WRAMP_UP); i++) { + if (i == WRAMP_UP) { + start = System.nanoTime(); // wramp up + } + Message msg = consumer.receive(5000); + Assert.assertNotNull(msg); + } + + end = System.nanoTime(); + + System.out.println("end - start = " + (end - start) + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(end - start)); + System.out.println("RECORD TIME = " + recordTime + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(recordTime)); + + // There's no way to sync on ack for AMQP + if (!protocol.equals("amqp") && (end - start) < recordTime) { + Assert.fail("Messages are being acked too fast! Faster than the disk would be able to sync!"); + } + } finally { + connection.close(); + } + + } + + // this will set ack as synchronous, to make sure we make proper measures against the sync on disk + private ConnectionFactory newCF() { + if (protocol.equals("core")) { + ConnectionFactory factory = new ActiveMQConnectionFactory(); + ((ActiveMQConnectionFactory) factory).setBlockOnAcknowledge(true); + return factory; + } else if (protocol.equals("amqp")) { + final JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616"); + factory.setForceAsyncAcks(true); + return factory; + } else { + org.apache.activemq.ActiveMQConnectionFactory cf = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.cacheEnabled=true"); + cf.setSendAcksAsync(false); + return cf; + } + + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java index 84a3ecc43d..c445a86e2c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/vertx/ActiveMQVertxUnitTest.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.integration.vertx.VertxOutgoingConnectorServi import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.vertx.java.core.Handler; import org.vertx.java.core.Vertx; @@ -48,8 +49,10 @@ import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory; /** * This class tests the basics of ActiveMQ - * vertx integration + * vertx inte + * gration */ +@Ignore public class ActiveMQVertxUnitTest extends ActiveMQTestBase { private PlatformManager vertxManager; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java index d0676eebc2..0316945bc3 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java @@ -59,6 +59,16 @@ public class FakeSequentialFileFactory implements SequentialFileFactory { this(1, false); } + @Override + public SequentialFileFactory setDatasync(boolean enabled) { + return null; + } + + @Override + public boolean isDatasync() { + return false; + } + @Override public int getMaxIO() { return 1;