This commit is contained in:
Clebert Suconic 2021-01-20 11:27:03 -05:00
commit 1183e24503
1 changed files with 19 additions and 5 deletions

View File

@ -593,7 +593,13 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
for (JournalFile jf : journalFiles) { for (JournalFile jf : journalFiles) {
if (!started) if (!started)
return; return;
replicator.syncJournalFile(jf, type);
ReplicationManager replicatorInUse = replicator;
if (replicatorInUse == null) {
throw ActiveMQMessageBundle.BUNDLE.replicatorIsNull();
}
replicatorInUse.syncJournalFile(jf, type);
} }
} }
@ -724,11 +730,13 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName); SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName);
if (!seqFile.exists()) if (!seqFile.exists())
continue; continue;
if (replicator != null) {
replicator.syncLargeMessageFile(seqFile, size, id); ReplicationManager replicatorInUse = replicator;
} else { if (replicatorInUse == null) {
throw ActiveMQMessageBundle.BUNDLE.replicatorIsNull(); throw ActiveMQMessageBundle.BUNDLE.replicatorIsNull();
} }
replicatorInUse.syncLargeMessageFile(seqFile, size, id);
} }
} }
@ -796,8 +804,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
for (Map.Entry<SimpleString, Collection<Integer>> entry : pageFilesToSync.entrySet()) { for (Map.Entry<SimpleString, Collection<Integer>> entry : pageFilesToSync.entrySet()) {
if (!started) if (!started)
return; return;
ReplicationManager replicatorInUse = replicator;
if (replicatorInUse == null) {
throw ActiveMQMessageBundle.BUNDLE.replicatorIsNull();
}
PagingStore store = manager.getPageStore(entry.getKey()); PagingStore store = manager.getPageStore(entry.getKey());
store.sendPages(replicator, entry.getValue()); store.sendPages(replicatorInUse, entry.getValue());
} }
} }