diff --git a/artemis-journal/pom.xml b/artemis-journal/pom.xml index 8afa3f98b9..f67d17564b 100644 --- a/artemis-journal/pom.xml +++ b/artemis-journal/pom.xml @@ -68,6 +68,10 @@ artemis-commons ${project.version} + + org.jctools + jctools-core + org.apache.activemq activemq-artemis-native diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index c89122d98b..b47e5f0919 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.io.aio; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,11 +36,12 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext; import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile; import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo; -import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.PowerOf2Util; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.jboss.logging.Logger; +import org.jctools.queues.MpmcArrayQueue; +import org.jctools.queues.atomic.MpmcAtomicArrayQueue; public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory { @@ -66,7 +68,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor volatile LibaioContext libaioContext; - private final CallbackCache callbackPool; + private final Queue callbackPool; private final AtomicBoolean running = new AtomicBoolean(false); @@ -96,14 +98,14 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor final IOCriticalErrorListener listener, final CriticalAnalyzer analyzer) { super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer); - callbackPool = new CallbackCache<>(maxIO); + callbackPool = PlatformDependent.hasUnsafe() ? new MpmcArrayQueue<>(maxIO) : new MpmcAtomicArrayQueue<>(maxIO); if (logger.isTraceEnabled()) { logger.trace("New AIO File Created"); } } public AIOSequentialCallback getCallback() { - AIOSequentialCallback callback = callbackPool.get(); + AIOSequentialCallback callback = callbackPool.poll(); if (callback == null) { callback = new AIOSequentialCallback(); } @@ -417,7 +419,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor buffersControl.bufferDone(buffer); } - callbackPool.put(AIOSequentialCallback.this); + callbackPool.offer(AIOSequentialCallback.this); } } }