From 851aef11723825f638c288ddd2ddf656cfcdaab2 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Fri, 7 Aug 2020 20:05:15 +0200 Subject: [PATCH] Revert "ARTEMIS-2837 Avoiding bursts on writes and pending callbacks" This reverts commit 1761f763 --- .../journal/impl/JournalFilesRepository.java | 10 ++++---- .../core/journal/impl/JournalImpl.java | 21 ++++++---------- .../journal/impl/JournalImplTestUnit.java | 25 ------------------- 3 files changed, 13 insertions(+), 43 deletions(-) diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java index 800d960d56..2c6033b536 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java @@ -444,11 +444,11 @@ public class JournalFilesRepository { pushOpen(); nextFile = openedFiles.poll(journalFileOpenTimeout, TimeUnit.SECONDS); - } else { - if (openedFiles.isEmpty()) { - // if empty, push to open one. - pushOpen(); - } + } + + if (openedFiles.isEmpty()) { + // if empty, push to open one. + pushOpen(); } if (nextFile == null) { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 8092cd061a..af443c4644 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -47,7 +47,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQShutdownException; import org.apache.activemq.artemis.api.core.Pair; -import org.apache.activemq.artemis.core.io.DummyCallback; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; @@ -841,7 +840,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize); } - final SimpleFuture result = newSyncAndCallbackResult(callback); + final SimpleFuture result = newSyncAndCallbackResult(sync, callback); appendExecutor.execute(new Runnable() { @Override public void run() { @@ -933,7 +932,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal Object record, boolean sync, IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException { - final SimpleFuture result = newSyncAndCallbackResult(callback); + final SimpleFuture result = newSyncAndCallbackResult(sync, callback); appendExecutor.execute(new Runnable() { @Override public void run() { @@ -1017,7 +1016,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private void internalAppendDeleteRecord(long id, boolean sync, IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException { - final SimpleFuture result = newSyncAndCallbackResult(callback); + final SimpleFuture result = newSyncAndCallbackResult(sync, callback); appendExecutor.execute(new Runnable() { @Override public void run() { @@ -1064,12 +1063,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal result.get(); } - private static SimpleFuture newSyncAndCallbackResult(IOCompletion callback) { - if (callback != null && callback != DummyCallback.getInstance()) { - return SimpleFuture.dumb(); - } else { - return new SimpleFutureImpl<>(); - } + private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) { + return (sync && callback == null) ? new SimpleFutureImpl<>() : SimpleFuture.dumb(); } @Override @@ -1295,7 +1290,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal logger.trace("scheduling appendPrepareRecord::txID=" + txID); } - final SimpleFuture result = newSyncAndCallbackResult(callback); + final SimpleFuture result = newSyncAndCallbackResult(sync, callback); appendExecutor.execute(new Runnable() { @Override @@ -1381,7 +1376,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } - final SimpleFuture result = newSyncAndCallbackResult(callback); + final SimpleFuture result = newSyncAndCallbackResult(sync, callback); appendExecutor.execute(new Runnable() { @Override @@ -1435,7 +1430,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal - final SimpleFuture result = newSyncAndCallbackResult(callback); + final SimpleFuture result = newSyncAndCallbackResult(sync, callback); appendExecutor.execute(new Runnable() { @Override public void run() { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index f615c29666..c6e0173591 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -117,31 +117,6 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { stopJournal(); } - @Test - public void testFlushAppendsAndDeletes() throws Exception { - setup(10, 10 * 1024, true); - createJournal(); - startJournal(); - load(); - byte[] record = new byte[1000]; - for (int i = 0; i < record.length; i++) { - record[i] = (byte) 'a'; - } - // Appending records after restart should be valid (not throwing any - // exceptions) - for (int i = 0; i < 10_000; i++) { - journal.appendAddRecord(i, (byte) 1, new SimpleEncoding(2, (byte) 'a'), false); - journal.appendDeleteRecord(i, false); - } - stopJournal(); - - List files = fileFactory.listFiles(fileExtension); - - // I am allowing one extra as a possible race with pushOpenFiles. I have not seen it happening on my test - // but it wouldn't be a problem if it happened - Assert.assertTrue("Supposed to have up to 10 files", files.size() <= 11); - } - @Test public void testParams() throws Exception { try {