ARTEMIS-1570 Flush appendExecutor before take journal snapshot
When live start replication, it must make sure there is no pending write in message & bindings journal, or we may lost journal records during initial replication. So we need flush append executor after acquire StorageManager's write lock, before Journal's write lock. Also we set a 10 seconds timeout when flush, the same as Journal::flushExecutor. If we failed to flush in 10 seconds, we abort replication, backup will try again later. Use OrderedExecutorFactory::flushExecutor to flush executor
This commit is contained in:
parent
9f77514225
commit
c4bfb9521f
|
@ -2237,6 +2237,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean flushAppendExecutor(long timeout, TimeUnit unit) throws InterruptedException {
|
||||||
|
return OrderedExecutorFactory.flushExecutor(appendExecutor, timeout, unit);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getDataFilesCount() {
|
public int getDataFilesCount() {
|
||||||
return filesRepository.getDataFilesCount();
|
return filesRepository.getDataFilesCount();
|
||||||
|
|
|
@ -568,6 +568,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
throw new ActiveMQIllegalStateException("already replicating");
|
throw new ActiveMQIllegalStateException("already replicating");
|
||||||
replicator = replicationManager;
|
replicator = replicationManager;
|
||||||
|
|
||||||
|
if (!((JournalImpl) originalMessageJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) {
|
||||||
|
throw new Exception("Live message journal is busy");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!((JournalImpl) originalBindingsJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) {
|
||||||
|
throw new Exception("Live bindings journal is busy");
|
||||||
|
}
|
||||||
|
|
||||||
// Establishes lock
|
// Establishes lock
|
||||||
originalMessageJournal.synchronizationLock();
|
originalMessageJournal.synchronizationLock();
|
||||||
originalBindingsJournal.synchronizationLock();
|
originalBindingsJournal.synchronizationLock();
|
||||||
|
|
Loading…
Reference in New Issue