ARTEMIS-2928 blocking CallbackCache can be replaced with a JCTools lock-free queue

This commit is contained in:
franz1981 2020-10-02 08:17:40 +02:00
parent ecdcdd0f78
commit a680f7d52e
2 changed files with 11 additions and 5 deletions

View File

@ -68,6 +68,10 @@
<artifactId>artemis-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-artemis-native</artifactId>

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.io.aio;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@ -35,11 +36,12 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger;
import org.jctools.queues.MpmcArrayQueue;
import org.jctools.queues.atomic.MpmcAtomicArrayQueue;
public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory {
@ -66,7 +68,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
volatile LibaioContext<AIOSequentialCallback> libaioContext;
private final CallbackCache<AIOSequentialCallback> callbackPool;
private final Queue<AIOSequentialCallback> callbackPool;
private final AtomicBoolean running = new AtomicBoolean(false);
@ -96,14 +98,14 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
final IOCriticalErrorListener listener,
final CriticalAnalyzer analyzer) {
super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer);
callbackPool = new CallbackCache<>(maxIO);
callbackPool = PlatformDependent.hasUnsafe() ? new MpmcArrayQueue<>(maxIO) : new MpmcAtomicArrayQueue<>(maxIO);
if (logger.isTraceEnabled()) {
logger.trace("New AIO File Created");
}
}
public AIOSequentialCallback getCallback() {
AIOSequentialCallback callback = callbackPool.get();
AIOSequentialCallback callback = callbackPool.poll();
if (callback == null) {
callback = new AIOSequentialCallback();
}
@ -417,7 +419,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
buffersControl.bufferDone(buffer);
}
callbackPool.put(AIOSequentialCallback.this);
callbackPool.offer(AIOSequentialCallback.this);
}
}
}