From a1add097348094d37392230bb72b55ab22530f55 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 19 Dec 2023 12:57:36 -0500 Subject: [PATCH] ARTEMIS-4543 Journal/NIO Sync is not scalling up with TimedBuffer --- .../core/io/AbstractSequentialFile.java | 25 +- .../artemis/core/io/DelegateCallback.java | 10 +- .../core/io/SequentialFileFactory.java | 4 + .../core/io/aio/AIOSequentialFileFactory.java | 5 + .../artemis/core/io/buffer/TimedBuffer.java | 18 +- .../core/io/buffer/TimedBufferObserver.java | 7 + .../core/io/mapped/TimedSequentialFile.java | 29 +- .../core/io/nio/NIOSequentialFile.java | 10 +- .../artemis/core/journal/Journal.java | 4 + .../core/journal/impl/JournalImpl.java | 5 + .../artemis/core/io/DelegateCallbackTest.java | 17 +- .../core/journal/impl/BatchCommitTest.java | 257 ++++++++++++++++++ .../journal/impl/JournalImplTestBase.java | 35 ++- .../journal/impl/JournalImplTestUnit.java | 2 - 14 files changed, 395 insertions(+), 33 deletions(-) create mode 100644 tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/BatchCommitTest.java diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java index 3d9d52dc25..aa79f5b8d6 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java @@ -29,6 +29,7 @@ import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver; @@ -271,6 +272,11 @@ public abstract class AbstractSequentialFile implements SequentialFile { protected class LocalBufferObserver implements TimedBufferObserver { + @Override + public boolean supportSync() { + return factory.isSyncSupported(); + } + @Override public void flushBuffer(final ByteBuf byteBuf, final boolean requestedSync, final List callbacks) { final int bytes = byteBuf.readableBytes(); @@ -285,12 +291,29 @@ public abstract class AbstractSequentialFile implements SequentialFile { ByteUtil.zeros(buffer, bytes, missingNonZeroedBytes); } buffer.flip(); - writeDirect(buffer, requestedSync, new DelegateCallback(callbacks)); + writeDirect(buffer, requestedSync, DelegateCallback.wrap(callbacks)); } else { IOCallback.done(callbacks); } } + @Override + public void checkSync(boolean syncRequested, List callbacks) { + if (syncRequested) { + try { + sync(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + if (callbacks != null) { + callbacks.forEach(c -> c.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage())); + } + } + } + if (callbacks != null) { + callbacks.forEach(c -> c.done()); + } + } + @Override public int getRemainingBytes() { if (fileSize - position.get() > Integer.MAX_VALUE) { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DelegateCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DelegateCallback.java index 12d20b5718..b033babb51 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DelegateCallback.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DelegateCallback.java @@ -26,12 +26,20 @@ public final class DelegateCallback implements IOCallback { private final Collection delegates; + public static DelegateCallback wrap(final Collection delegates) { + if (delegates == null) { + return null; + } else { + return new DelegateCallback(delegates); + } + } + /** * It doesn't copy defensively the given {@code delegates}. * * @throws NullPointerException if {@code delegates} is {@code null} */ - public DelegateCallback(final Collection delegates) { + private DelegateCallback(final Collection delegates) { Objects.requireNonNull(delegates, "delegates cannot be null!"); this.delegates = delegates; } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java index fa4891e7a4..b4c9567baf 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java @@ -46,6 +46,10 @@ public interface SequentialFileFactory { int getMaxIO(); + default boolean isSyncSupported() { + return true; + } + /** * Lists files that end with the given extension. *

diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index 6db54bf6cd..91b093eecf 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -76,6 +76,11 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor private static final String AIO_TEST_FILE = ".aio-test"; + @Override + public boolean isSyncSupported() { + return false; + } + public void beforeClose() { } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java index fd3dbf8b23..a46c2a9e9f 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java @@ -312,6 +312,9 @@ public final class TimedBuffer extends CriticalComponentImpl { * @return {@code true} when are flushed any bytes, {@code false} otherwise */ public boolean flushBatch() { + List syncCallbackList = null; + boolean localUseSync = false; + TimedBufferObserver syncBufferObserver = null; try (ArtemisCloseable measure = measureCritical(CRITICAL_PATH_FLUSH)) { synchronized (this) { if (!started) { @@ -325,7 +328,16 @@ public final class TimedBuffer extends CriticalComponentImpl { bytesFlushed.addAndGet(pos); } - bufferObserver.flushBuffer(buffer.byteBuf(), pendingSync, callbacks); + if (bufferObserver.supportSync()) { + // performing the sync away from the lock + // so other writes can be performed while that flush is happening + syncCallbackList = callbacks; + localUseSync = pendingSync; + syncBufferObserver = bufferObserver; + bufferObserver.flushBuffer(buffer.byteBuf(), false, null); + } else { + bufferObserver.flushBuffer(buffer.byteBuf(), pendingSync, callbacks); + } stopSpin(); @@ -345,6 +357,10 @@ public final class TimedBuffer extends CriticalComponentImpl { return false; } } + } finally { + if (syncBufferObserver != null) { + syncBufferObserver.checkSync(localUseSync, syncCallbackList); + } } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java index 36f6602373..9e877d3d19 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java @@ -29,6 +29,13 @@ public interface TimedBufferObserver { */ void flushBuffer(ByteBuf buffer, boolean syncRequested, List callbacks); + default void checkSync(boolean syncRequested, List callbacks) { + } + + default boolean supportSync() { + return false; + } + /** * Return the number of remaining bytes that still fit on the observer (file) */ diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java index fac4d5ef5e..fa93e1c13b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.io.mapped; import java.io.File; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.nio.ByteBuffer; import java.util.List; @@ -34,9 +35,14 @@ import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; final class TimedSequentialFile implements SequentialFile { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final SequentialFileFactory factory; private final SequentialFile sequentialFile; private final LocalBufferObserver observer; @@ -247,6 +253,11 @@ final class TimedSequentialFile implements SequentialFile { private final class LocalBufferObserver implements TimedBufferObserver { + @Override + public boolean supportSync() { + return true; + } + @Override public void flushBuffer(final ByteBuf byteBuf, final boolean requestedSync, final List callbacks) { final int bytes = byteBuf.readableBytes(); @@ -266,8 +277,7 @@ final class TimedSequentialFile implements SequentialFile { buffer.flip(); } try { - blockingWriteDirect(buffer, requestedSync, releaseBuffer); - IOCallback.done(callbacks); + blockingWriteDirect(buffer, false, releaseBuffer); } catch (Throwable t) { final int code; if (t instanceof IOException) { @@ -278,8 +288,19 @@ final class TimedSequentialFile implements SequentialFile { } IOCallback.onError(callbacks, code, t.getMessage()); } - } else { - IOCallback.done(callbacks); + } + } + + @Override + public void checkSync(boolean syncRequested, List callbacks) { + try { + sync(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } finally { + if (callbacks != null) { + callbacks.forEach(c -> c.done()); + } } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index 0a8b2ebb47..3dd7be7b0d 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -383,11 +383,11 @@ public class NIOSequentialFile extends AbstractSequentialFile { } @Override - public void blockingWriteDirect(ByteBuffer bytes,boolean sync, boolean releaseBuffer) throws Exception { + public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer) throws Exception { internalWrite(bytes, sync, null, releaseBuffer); } - private void internalWrite(final ByteBuffer bytes, + private synchronized void internalWrite(final ByteBuffer bytes, final boolean sync, final IOCallback callback, boolean releaseBuffer) throws IOException, ActiveMQIOErrorException, InterruptedException { @@ -474,12 +474,14 @@ public class NIOSequentialFile extends AbstractSequentialFile { //enable zero copy case if (byteBuf.nioBufferCount() == 1 && byteBuf.isDirect()) { final ByteBuffer buffer = byteBuf.internalNioBuffer(byteBuf.readerIndex(), bytes); - final IOCallback callback = new DelegateCallback(callbacks); + final IOCallback callback = DelegateCallback.wrap(callbacks); try { //no need to pool the buffer and don't care if the NIO buffer got modified internalWrite(buffer, requestedSync, callback, false); } catch (Exception e) { - callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage()); + if (callbacks != null) { + callbacks.forEach(c -> c.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage())); + } } } else { super.flushBuffer(byteBuf, requestedSync, callbacks); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index 2dba3ed2a7..d38f0cf774 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -81,6 +81,10 @@ public interface Journal extends ActiveMQComponent { return this; } + default File getHistoryFolder() { + return null; + } + void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception; void appendAddRecord(long id, diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 74b887de02..6a3abac2f8 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -239,6 +239,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal return journalRetentionFolder != null; } + @Override + public File getHistoryFolder() { + return journalRetentionFolder; + } + @Override public JournalImpl setHistoryFolder(File historyFolder, long maxBytes, long period) throws Exception { diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/DelegateCallbackTest.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/DelegateCallbackTest.java index 21fa8d7422..db7c15dbb1 100644 --- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/DelegateCallbackTest.java +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/DelegateCallbackTest.java @@ -25,11 +25,6 @@ import org.junit.Test; public class DelegateCallbackTest { - @Test(expected = NullPointerException.class) - public void shouldFailWithNullDelegates() { - new DelegateCallback(null); - } - private static final class CountingIOCallback implements IOCallback { long done = 0; @@ -60,7 +55,7 @@ public class DelegateCallbackTest { @Test public void shouldCallDoneOnEachCallback() { final CountingIOCallback countingIOCallback = new CountingIOCallback(false); - final DelegateCallback callback = new DelegateCallback(Arrays.asList(countingIOCallback, countingIOCallback)); + final DelegateCallback callback = DelegateCallback.wrap(Arrays.asList(countingIOCallback, countingIOCallback)); callback.done(); Assert.assertEquals(2, countingIOCallback.done); Assert.assertEquals(0, countingIOCallback.onError); @@ -69,7 +64,7 @@ public class DelegateCallbackTest { @Test public void shouldCallOnErrorOnEachCallback() { final CountingIOCallback countingIOCallback = new CountingIOCallback(false); - final DelegateCallback callback = new DelegateCallback(Arrays.asList(countingIOCallback, countingIOCallback)); + final DelegateCallback callback = DelegateCallback.wrap(Arrays.asList(countingIOCallback, countingIOCallback)); callback.onError(0, "not a real error"); Assert.assertEquals(0, countingIOCallback.done); Assert.assertEquals(2, countingIOCallback.onError); @@ -78,7 +73,7 @@ public class DelegateCallbackTest { @Test public void shouldCallDoneOnEachCallbackWithExceptions() { final CountingIOCallback countingIOCallback = new CountingIOCallback(true); - final DelegateCallback callback = new DelegateCallback(Arrays.asList(countingIOCallback, countingIOCallback)); + final DelegateCallback callback = DelegateCallback.wrap(Arrays.asList(countingIOCallback, countingIOCallback)); callback.done(); Assert.assertEquals(2, countingIOCallback.done); Assert.assertEquals(0, countingIOCallback.onError); @@ -87,7 +82,7 @@ public class DelegateCallbackTest { @Test public void shouldCallOnErrorOnEachCallbackWithExceptions() { final CountingIOCallback countingIOCallback = new CountingIOCallback(true); - final DelegateCallback callback = new DelegateCallback(Arrays.asList(countingIOCallback, countingIOCallback)); + final DelegateCallback callback = DelegateCallback.wrap(Arrays.asList(countingIOCallback, countingIOCallback)); callback.onError(0, "not a real error"); Assert.assertEquals(0, countingIOCallback.done); Assert.assertEquals(2, countingIOCallback.onError); @@ -97,7 +92,7 @@ public class DelegateCallbackTest { public void shouldLogOnDoneForEachExceptions() throws Exception { try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { final CountingIOCallback countingIOCallback = new CountingIOCallback(true); - final DelegateCallback callback = new DelegateCallback(Collections.singleton(countingIOCallback)); + final DelegateCallback callback = DelegateCallback.wrap(Collections.singleton(countingIOCallback)); callback.done(); Assert.assertTrue(loggerHandler.findText("AMQ142024")); } @@ -107,7 +102,7 @@ public class DelegateCallbackTest { public void shouldLogOnErrorForEachExceptions() throws Exception { try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { final CountingIOCallback countingIOCallback = new CountingIOCallback(true); - final DelegateCallback callback = new DelegateCallback(Collections.singleton(countingIOCallback)); + final DelegateCallback callback = DelegateCallback.wrap(Collections.singleton(countingIOCallback)); callback.onError(0, "not a real error"); Assert.assertTrue(loggerHandler.findText("AMQ142025")); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/BatchCommitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/BatchCommitTest.java new file mode 100644 index 0000000000..621d235da3 --- /dev/null +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/BatchCommitTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.unit.core.journal.impl; + +import java.io.File; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; +import org.apache.activemq.artemis.core.journal.IOCompletion; +import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; +import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.journal.impl.JournalImpl; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.SimpleIDGenerator; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; +import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BatchCommitTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final int FILE_SIZE = 10 * 1024 * 1024; + private static final int MIN_FILES = 10; + private static final int POOL_SIZE = 10; + private static final String FILE_PREFIX = "journal-test"; + private static final String FILE_EXTENSION = "amq"; + private static final int BUFFER_SIZE = 100 * 1024; + private static final int BUFFER_TIMEOUT = 10 * 1024; + + private static final int MAX_AIO = 255; + + private static final int OK = 100; + private static final int ERROR = 101; + + private static final int RECORDS = 10000; + + JournalImpl journal; + + SequentialFileFactory journalFF; + + SimpleIDGenerator idGenerator = new SimpleIDGenerator(1); + + public Journal testRunNIO(String testFolder, boolean sync) throws Throwable { + return testRun(testFolder, JournalType.NIO, sync); + } + + public Journal testRunMapped(String testFolder, boolean sync) throws Throwable { + return testRun(testFolder, JournalType.MAPPED, sync); + } + + + public Journal testRunAIO(String testFolder, boolean sync) throws Throwable { + return testRun(testFolder, JournalType.ASYNCIO, sync); + } + + + public Journal testRun(String testFolder, JournalType journalType, boolean sync) throws Throwable { + + OrderedExecutorFactory orderedExecutorFactory = getExecutorFactory(); + setupJournal(journalType, testFolder, orderedExecutorFactory); + journal.start(); + runAfter(journal::stop); + journal.loadInternalOnly(); + + CountDownLatch latch = new CountDownLatch(RECORDS); + + ConcurrentHashSet existingRecords = new ConcurrentHashSet<>(); + + AtomicInteger errors = new AtomicInteger(0); + + + for (int i = 0; i < RECORDS; i++) { + long tx = idGenerator.generateID(); + long id = idGenerator.generateID(); + long upid = idGenerator.generateID(); + existingRecords.add(tx); + IOCompletion completion = new IOCompletion() { + @Override + public void storeLineUp() { + } + + @Override + public void done() { + if (!existingRecords.remove(tx)) { + errors.incrementAndGet(); + logger.warn("Id {} was removed before", tx); + } + latch.countDown(); + } + + @Override + public void onError(int errorCode, String errorMessage) { + } + }; + + journal.appendAddRecordTransactional(tx, id, (byte) 1, ("add " + id).getBytes()); + journal.appendUpdateRecordTransactional(tx, upid, (byte) 1, ("up " + upid).getBytes()); + journal.appendCommitRecord(tx, sync, completion, true); + } + + if (!latch.await(10, TimeUnit.SECONDS)) { + logger.warn("latch didn't finish, count={}", latch.getCount()); + errors.incrementAndGet(); + } + + existingRecords.forEach(l -> logger.warn("id {} still in the list", l)); + + Assert.assertEquals(0, errors.get()); + Assert.assertEquals(0, existingRecords.size()); + + return journal; + + } + + private OrderedExecutorFactory getExecutorFactory() { + ExecutorService service = Executors.newFixedThreadPool(10, new ThreadFactory() { + int counter = 0; + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("AsyncCommitTest" + (counter++)); + return t; + } + }); + OrderedExecutorFactory orderedExecutorFactory = new OrderedExecutorFactory(service); + runAfter(service::shutdownNow); + return orderedExecutorFactory; + } + + @Test + public void testNIO() throws Exception { + internalTest(JournalType.NIO, "testRunNIO", true); + } + + @Test + public void testNIONoSync() throws Exception { + internalTest(JournalType.NIO, "testRunNIO", false); + } + + @Test + public void testMapped() throws Exception { + internalTest(JournalType.MAPPED, "testRunMapped", true); + } + + @Test + public void testMappedNoSync() throws Exception { + internalTest(JournalType.MAPPED, "testRunMapped", false); + } + + @Test + public void testAIO() throws Exception { + Assume.assumeTrue(LibaioContext.isLoaded()); + internalTest(JournalType.ASYNCIO, "testRunAIO", true); + } + + @Test + public void testAIONoSync() throws Exception { + Assume.assumeTrue(LibaioContext.isLoaded()); + internalTest(JournalType.ASYNCIO, "testRunAIO", false); + } + + private void proceedCall(String testName, String testFolder, boolean sync) throws Exception { + Method method = getClass().getMethod(testName, String.class, Boolean.TYPE); + Journal journal = (Journal)method.invoke(this, testFolder, sync); + journal.stop(); + } + + private void internalTest(JournalType journalType, String testRunName, boolean sync) throws Exception { + proceedCall(testRunName, getTestDir(), sync); + + OrderedExecutorFactory orderedExecutorFactory = getExecutorFactory(); + setupJournal(journalType, getTestDir(), orderedExecutorFactory); + + ArrayList commited = new ArrayList<>(); + ArrayList prepared = new ArrayList<>(); + AtomicInteger failedTX = new AtomicInteger(0); + + journal.start(); + journal.load(commited, prepared, (id, records, toDelete) -> failedTX.incrementAndGet(), false); + runAfter(journal::stop); + + commited.forEach(r -> { + String dataAsString = new String(r.data); + logger.debug("data={}, isUpdate={}, id={}", dataAsString, r.isUpdate, r.id); + if (r.isUpdate) { + Assert.assertEquals("up " + r.id, dataAsString); + } else { + Assert.assertEquals("add " + r.id, dataAsString); + } + }); + Assert.assertEquals(RECORDS * 2, commited.size()); + Assert.assertEquals(0, failedTX.get()); + + + + } + + + + public void setupJournal(JournalType journalType, String location, ExecutorFactory executorFactory) { + + File locationFile = new File(location); + + switch (journalType) { + + case NIO: + journalFF = new NIOSequentialFileFactory(locationFile, true, BUFFER_SIZE, BUFFER_TIMEOUT, 1, true, null, null); + break; + case ASYNCIO: + journalFF = new AIOSequentialFileFactory(locationFile, BUFFER_SIZE, BUFFER_TIMEOUT, MAX_AIO, true, null, null); + break; + case MAPPED: + journalFF = new MappedSequentialFileFactory(locationFile, FILE_SIZE, true, BUFFER_SIZE, BUFFER_TIMEOUT, null); + break; + default: + throw new IllegalStateException("invalid journal type " + journalType); + } + + journal = new JournalImpl(executorFactory, FILE_SIZE, MIN_FILES, POOL_SIZE, 0, 0, 30_000, journalFF, FILE_PREFIX, FILE_EXTENSION, MAX_AIO, 1, null, 10); + } + +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java index 10175314e6..3b18e357bf 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.cli.commands.tools.journal.DecodeJournal; import org.apache.activemq.artemis.cli.commands.tools.journal.EncodeJournal; import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; @@ -199,7 +200,7 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { if (suportsRetention()) { // FakeSequentialFile won't support retention - File fileBackup = new File(getTestDir(), "backupFoler"); + File fileBackup = new File(getTestDir(), "backupFolder"); fileBackup.mkdirs(); ((JournalImpl) journal).setHistoryFolder(fileBackup, -1, -1); } @@ -691,35 +692,37 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { Assert.assertArrayEquals(expectedArray, actualArray); } catch (AssertionError e) { + logger.warn(e.getMessage(), e); + HashSet hashActual = new HashSet<>(); hashActual.addAll(actual); HashSet hashExpected = new HashSet<>(); hashExpected.addAll(expected); - logger.debug("#Summary **********************************************************************************************************************"); + logger.warn("#Summary **********************************************************************************************************************"); for (RecordInfo r : hashActual) { if (!hashExpected.contains(r)) { - logger.debug("Record {} was supposed to be removed and it exists", r); + logger.warn("Record {} was supposed to be removed and it exists", r); } } for (RecordInfo r : hashExpected) { if (!hashActual.contains(r)) { - logger.debug("Record {} was not found on actual list", r); + logger.warn("Record {} was not found on actual list", r); } } - logger.debug("#expected **********************************************************************************************************************"); + logger.warn("#expected **********************************************************************************************************************"); for (RecordInfo recordInfo : expected) { - logger.debug("Record::{}", recordInfo); + logger.warn("Record::{}", recordInfo); } - logger.debug("#actual ************************************************************************************************************************"); + logger.warn("#actual ************************************************************************************************************************"); for (RecordInfo recordInfo : actual) { - logger.debug("Record::{}", recordInfo); + logger.warn("Record::{}", recordInfo); } - logger.debug("#records ***********************************************************************************************************************"); + logger.warn("#records ***********************************************************************************************************************"); try { describeJournal(journal.getFileFactory(), (JournalImpl) journal, journal.getFileFactory().getDirectory(), System.out); @@ -727,6 +730,20 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { e2.printStackTrace(); } + logger.warn("#records on retention (history) folder ***********************************************************************************************************************"); + + try { + NIOSequentialFileFactory nioSequentialFileFactory = new NIOSequentialFileFactory(journal.getHistoryFolder(), 1); + + + JournalImpl backupJournal = new JournalImpl(journal.getFileSize(), journal.getMinFiles(), 10, 0, 0, nioSequentialFileFactory, "amq", "amq", 1); + + describeJournal(nioSequentialFileFactory, backupJournal, journal.getHistoryFolder(), System.out); + + } catch (Exception e2) { + e2.printStackTrace(); + } + } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index 35e07160f2..c5b14ab3c2 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -906,8 +906,6 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { testReclaimTransactionalAdd(false); } - // TODO commit and rollback, also transactional deletes - private void testReclaimTransactionalAdd(final boolean commit) throws Exception { setup(10, 10 * 1024, true); createJournal();