This closes #400

This commit is contained in:
Clebert Suconic 2016-02-23 14:17:14 -05:00
commit 89aafa44bb
3 changed files with 26 additions and 8 deletions

View File

@ -584,7 +584,12 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName); SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName);
if (!seqFile.exists()) if (!seqFile.exists())
continue; continue;
replicator.syncLargeMessageFile(seqFile, size, id); if (replicator != null) {
replicator.syncLargeMessageFile(seqFile, size, id);
}
else {
throw ActiveMQMessageBundle.BUNDLE.replicatorIsNull();
}
} }
} }
@ -711,4 +716,4 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
readUnLock(); readUnLock();
} }
} }
} }

View File

@ -25,7 +25,7 @@ import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -110,9 +110,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
private volatile boolean enabled; private volatile boolean enabled;
private final AtomicBoolean writable = new AtomicBoolean(false);
private final Object replicationLock = new Object(); private final Object replicationLock = new Object();
private final ReusableLatch latch = new ReusableLatch();
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>(); private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();
private final ExecutorFactory executorFactory; private final ExecutorFactory executorFactory;
@ -271,10 +272,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
if (replicatingChannel != null) { if (replicatingChannel != null) {
replicatingChannel.close(); replicatingChannel.close();
replicatingChannel.getConnection().getTransportConnection().fireReady(true); replicatingChannel.getConnection().getTransportConnection().fireReady(true);
latch.setCount(0);
} }
synchronized (replicationLock) { synchronized (replicationLock) {
writable.set(true);
replicationLock.notifyAll();
clearReplicationTokens(); clearReplicationTokens();
} }
@ -342,10 +344,15 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
if (enabled) { if (enabled) {
pendingTokens.add(repliToken); pendingTokens.add(repliToken);
if (!replicatingChannel.getConnection().isWritable(this)) { if (!replicatingChannel.getConnection().isWritable(this)) {
latch.countUp();
try { try {
//don't wait for ever as this may hang tests etc, we've probably been closed anyway //don't wait for ever as this may hang tests etc, we've probably been closed anyway
latch.await(5, TimeUnit.SECONDS); long now = System.currentTimeMillis();
long deadline = now + 5000;
while (!writable.get() && now < deadline) {
replicationLock.wait(deadline - now);
now = System.currentTimeMillis();
}
writable.set(false);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e); throw new ActiveMQInterruptedException(e);
@ -370,7 +377,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
@Override @Override
public void readyForWriting() { public void readyForWriting() {
latch.countDown(); synchronized (replicationLock) {
writable.set(true);
replicationLock.notifyAll();
}
} }
/** /**

View File

@ -368,4 +368,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 119116, value = "Netty Acceptor unavailable", format = Message.Format.MESSAGE_FORMAT) @Message(id = 119116, value = "Netty Acceptor unavailable", format = Message.Format.MESSAGE_FORMAT)
IllegalStateException acceptorUnavailable(); IllegalStateException acceptorUnavailable();
@Message(id = 119117, value = "Replicator is null. Replication was likely terminated.")
ActiveMQIllegalStateException replicatorIsNull();
} }