diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index c1ef0d32be..8807f4eb3e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -584,7 +584,12 @@ public class JournalStorageManager extends AbstractJournalStorageManager { SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName); if (!seqFile.exists()) continue; - replicator.syncLargeMessageFile(seqFile, size, id); + if (replicator != null) { + replicator.syncLargeMessageFile(seqFile, size, id); + } + else { + throw ActiveMQMessageBundle.BUNDLE.replicatorIsNull(); + } } } @@ -711,4 +716,4 @@ public class JournalStorageManager extends AbstractJournalStorageManager { readUnLock(); } } -} \ No newline at end of file +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index e84257f2f0..8f0774e052 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -110,9 +110,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene private volatile boolean enabled; + private final AtomicBoolean writable = new AtomicBoolean(false); + private final Object replicationLock = new Object(); - private final ReusableLatch latch = new ReusableLatch(); private final Queue pendingTokens = new ConcurrentLinkedQueue<>(); private final ExecutorFactory executorFactory; @@ -271,10 +272,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene if (replicatingChannel != null) { replicatingChannel.close(); replicatingChannel.getConnection().getTransportConnection().fireReady(true); - latch.setCount(0); } synchronized (replicationLock) { + writable.set(true); + replicationLock.notifyAll(); clearReplicationTokens(); } @@ -342,10 +344,15 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene if (enabled) { pendingTokens.add(repliToken); if (!replicatingChannel.getConnection().isWritable(this)) { - latch.countUp(); try { //don't wait for ever as this may hang tests etc, we've probably been closed anyway - latch.await(5, TimeUnit.SECONDS); + long now = System.currentTimeMillis(); + long deadline = now + 5000; + while (!writable.get() && now < deadline) { + replicationLock.wait(deadline - now); + now = System.currentTimeMillis(); + } + writable.set(false); } catch (InterruptedException e) { throw new ActiveMQInterruptedException(e); @@ -370,7 +377,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene @Override public void readyForWriting() { - latch.countDown(); + synchronized (replicationLock) { + writable.set(true); + replicationLock.notifyAll(); + } } /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index da1aee44aa..4de94b971a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -368,4 +368,7 @@ public interface ActiveMQMessageBundle { @Message(id = 119116, value = "Netty Acceptor unavailable", format = Message.Format.MESSAGE_FORMAT) IllegalStateException acceptorUnavailable(); + + @Message(id = 119117, value = "Replicator is null. Replication was likely terminated.") + ActiveMQIllegalStateException replicatorIsNull(); }