From a6bf7d0e040eadfb5800651277edfe25191490ba Mon Sep 17 00:00:00 2001 From: franz1981 Date: Thu, 13 Aug 2020 20:31:10 +0200 Subject: [PATCH] ARTEMIS-2837 Bursts of open files under high load This would prevent to push a new opened file if there is already one available to be consumed --- .../journal/impl/JournalFilesRepository.java | 23 +++++++++-------- .../journal/impl/JournalImplTestUnit.java | 25 +++++++++++++++++++ 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java index 2c6033b536..0765b7ac11 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java @@ -88,11 +88,14 @@ public class JournalFilesRepository { private final Runnable pushOpenRunnable = new Runnable() { @Override public void run() { - try { - pushOpenedFile(); - } catch (Exception e) { - ActiveMQJournalLogger.LOGGER.errorPushingFile(e); - fileFactory.onIOError(e, "unable to open ", null); + // if there's already an opened file there is no need to push a new one + if (openedFiles.isEmpty()) { + try { + pushOpenedFile(); + } catch (Exception e) { + ActiveMQJournalLogger.LOGGER.errorPushingFile(e); + fileFactory.onIOError(e, "unable to open ", null); + } } } }; @@ -444,11 +447,11 @@ public class JournalFilesRepository { pushOpen(); nextFile = openedFiles.poll(journalFileOpenTimeout, TimeUnit.SECONDS); - } - - if (openedFiles.isEmpty()) { - // if empty, push to open one. - pushOpen(); + } else { + if (openedFiles.isEmpty()) { + // if empty, push to open one. + pushOpen(); + } } if (nextFile == null) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index c6e0173591..f615c29666 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -117,6 +117,31 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { stopJournal(); } + @Test + public void testFlushAppendsAndDeletes() throws Exception { + setup(10, 10 * 1024, true); + createJournal(); + startJournal(); + load(); + byte[] record = new byte[1000]; + for (int i = 0; i < record.length; i++) { + record[i] = (byte) 'a'; + } + // Appending records after restart should be valid (not throwing any + // exceptions) + for (int i = 0; i < 10_000; i++) { + journal.appendAddRecord(i, (byte) 1, new SimpleEncoding(2, (byte) 'a'), false); + journal.appendDeleteRecord(i, false); + } + stopJournal(); + + List files = fileFactory.listFiles(fileExtension); + + // I am allowing one extra as a possible race with pushOpenFiles. I have not seen it happening on my test + // but it wouldn't be a problem if it happened + Assert.assertTrue("Supposed to have up to 10 files", files.size() <= 11); + } + @Test public void testParams() throws Exception { try {