diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java index 2c6033b536..800d960d56 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java @@ -444,11 +444,11 @@ public class JournalFilesRepository { pushOpen(); nextFile = openedFiles.poll(journalFileOpenTimeout, TimeUnit.SECONDS); - } - - if (openedFiles.isEmpty()) { - // if empty, push to open one. - pushOpen(); + } else { + if (openedFiles.isEmpty()) { + // if empty, push to open one. + pushOpen(); + } } if (nextFile == null) { diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index af443c4644..8092cd061a 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -47,6 +47,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQShutdownException; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.core.io.DummyCallback; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; @@ -840,7 +841,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize); } - final SimpleFuture result = newSyncAndCallbackResult(sync, callback); + final SimpleFuture result = newSyncAndCallbackResult(callback); appendExecutor.execute(new Runnable() { @Override public void run() { @@ -932,7 +933,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal Object record, boolean sync, IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException { - final SimpleFuture result = newSyncAndCallbackResult(sync, callback); + final SimpleFuture result = newSyncAndCallbackResult(callback); appendExecutor.execute(new Runnable() { @Override public void run() { @@ -1016,7 +1017,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private void internalAppendDeleteRecord(long id, boolean sync, IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException { - final SimpleFuture result = newSyncAndCallbackResult(sync, callback); + final SimpleFuture result = newSyncAndCallbackResult(callback); appendExecutor.execute(new Runnable() { @Override public void run() { @@ -1063,8 +1064,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal result.get(); } - private static SimpleFuture newSyncAndCallbackResult(boolean sync, IOCompletion callback) { - return (sync && callback == null) ? new SimpleFutureImpl<>() : SimpleFuture.dumb(); + private static SimpleFuture newSyncAndCallbackResult(IOCompletion callback) { + if (callback != null && callback != DummyCallback.getInstance()) { + return SimpleFuture.dumb(); + } else { + return new SimpleFutureImpl<>(); + } } @Override @@ -1290,7 +1295,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal logger.trace("scheduling appendPrepareRecord::txID=" + txID); } - final SimpleFuture result = newSyncAndCallbackResult(sync, callback); + final SimpleFuture result = newSyncAndCallbackResult(callback); appendExecutor.execute(new Runnable() { @Override @@ -1376,7 +1381,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } - final SimpleFuture result = newSyncAndCallbackResult(sync, callback); + final SimpleFuture result = newSyncAndCallbackResult(callback); appendExecutor.execute(new Runnable() { @Override @@ -1430,7 +1435,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal - final SimpleFuture result = newSyncAndCallbackResult(sync, callback); + final SimpleFuture result = newSyncAndCallbackResult(callback); appendExecutor.execute(new Runnable() { @Override public void run() { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index c6e0173591..f615c29666 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -117,6 +117,31 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { stopJournal(); } + @Test + public void testFlushAppendsAndDeletes() throws Exception { + setup(10, 10 * 1024, true); + createJournal(); + startJournal(); + load(); + byte[] record = new byte[1000]; + for (int i = 0; i < record.length; i++) { + record[i] = (byte) 'a'; + } + // Appending records after restart should be valid (not throwing any + // exceptions) + for (int i = 0; i < 10_000; i++) { + journal.appendAddRecord(i, (byte) 1, new SimpleEncoding(2, (byte) 'a'), false); + journal.appendDeleteRecord(i, false); + } + stopJournal(); + + List files = fileFactory.listFiles(fileExtension); + + // I am allowing one extra as a possible race with pushOpenFiles. I have not seen it happening on my test + // but it wouldn't be a problem if it happened + Assert.assertTrue("Supposed to have up to 10 files", files.size() <= 11); + } + @Test public void testParams() throws Exception { try {