diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 5f31a2b216..77bf9da5e2 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -2237,6 +2237,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } + public boolean flushAppendExecutor(long timeout, TimeUnit unit) throws InterruptedException { + return OrderedExecutorFactory.flushExecutor(appendExecutor, timeout, unit); + } + @Override public int getDataFilesCount() { return filesRepository.getDataFilesCount(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 87f4fc9982..c54a3b77b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -568,6 +568,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager { throw new ActiveMQIllegalStateException("already replicating"); replicator = replicationManager; + if (!((JournalImpl) originalMessageJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) { + throw new Exception("Live message journal is busy"); + } + + if (!((JournalImpl) originalBindingsJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) { + throw new Exception("Live bindings journal is busy"); + } + // Establishes lock originalMessageJournal.synchronizationLock(); originalBindingsJournal.synchronizationLock();