This closes #327

This commit is contained in:
Clebert Suconic 2016-01-19 13:02:09 -05:00
commit 54222d8667
1 changed files with 27 additions and 5 deletions

View File

@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
@ -65,6 +66,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
@ -76,7 +78,7 @@ import org.apache.activemq.artemis.utils.ReusableLatch;
*
* @see ReplicationEndpoint
*/
public final class ReplicationManager implements ActiveMQComponent {
public final class ReplicationManager implements ActiveMQComponent, ReadyListener {
public enum ADD_OPERATION_TYPE {
UPDATE {
@ -109,6 +111,7 @@ public final class ReplicationManager implements ActiveMQComponent {
private final Object replicationLock = new Object();
private final ReusableLatch latch = new ReusableLatch();
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();
private final ExecutorFactory executorFactory;
@ -261,11 +264,16 @@ public final class ReplicationManager implements ActiveMQComponent {
return;
}
enabled = false;
// This is to avoid the write holding a lock while we are trying to close it
if (replicatingChannel != null) {
replicatingChannel.close();
replicatingChannel.getConnection().getTransportConnection().fireReady(true);
latch.setCount(0);
}
synchronized (replicationLock) {
enabled = false;
if (replicatingChannel != null) {
replicatingChannel.close();
}
clearReplicationTokens();
}
@ -332,6 +340,15 @@ public final class ReplicationManager implements ActiveMQComponent {
synchronized (replicationLock) {
if (enabled) {
pendingTokens.add(repliToken);
if (!replicatingChannel.getConnection().isWritable(this)) {
latch.countUp();
try {
latch.await();
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
replicatingChannel.send(packet);
}
else {
@ -349,6 +366,11 @@ public final class ReplicationManager implements ActiveMQComponent {
return repliToken;
}
@Override
public void readyForWriting() {
latch.countDown();
}
/**
* @throws IllegalStateException By default, all replicated packets generate a replicated
* response. If your packets are triggering this exception, it may be because the