From c4bfb9521fd322c7179d31d5b5f7acf3f25d32dd Mon Sep 17 00:00:00 2001 From: shoukun Date: Wed, 27 Dec 2017 10:23:33 +0800 Subject: [PATCH] ARTEMIS-1570 Flush appendExecutor before take journal snapshot When live start replication, it must make sure there is no pending write in message & bindings journal, or we may lost journal records during initial replication. So we need flush append executor after acquire StorageManager's write lock, before Journal's write lock. Also we set a 10 seconds timeout when flush, the same as Journal::flushExecutor. If we failed to flush in 10 seconds, we abort replication, backup will try again later. Use OrderedExecutorFactory::flushExecutor to flush executor --- .../activemq/artemis/core/journal/impl/JournalImpl.java | 4 ++++ .../persistence/impl/journal/JournalStorageManager.java | 8 ++++++++ 2 files changed, 12 insertions(+) diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 5f31a2b216..77bf9da5e2 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -2237,6 +2237,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } + public boolean flushAppendExecutor(long timeout, TimeUnit unit) throws InterruptedException { + return OrderedExecutorFactory.flushExecutor(appendExecutor, timeout, unit); + } + @Override public int getDataFilesCount() { return filesRepository.getDataFilesCount(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 87f4fc9982..c54a3b77b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -568,6 +568,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager { throw new ActiveMQIllegalStateException("already replicating"); replicator = replicationManager; + if (!((JournalImpl) originalMessageJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) { + throw new Exception("Live message journal is busy"); + } + + if (!((JournalImpl) originalBindingsJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) { + throw new Exception("Live bindings journal is busy"); + } + // Establishes lock originalMessageJournal.synchronizationLock(); originalBindingsJournal.synchronizationLock();