ARTEMIS-350 - possible OOM in replication manager

https://issues.apache.org/jira/browse/ARTEMIS-350
This commit is contained in:
Andy Taylor 2016-01-19 10:11:17 +00:00 committed by Clebert Suconic
parent 8e4aca126b
commit b7f0d14b18
1 changed files with 21 additions and 1 deletions

View File

@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
@ -65,6 +66,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
@ -76,7 +78,7 @@ import org.apache.activemq.artemis.utils.ReusableLatch;
*
* @see ReplicationEndpoint
*/
public final class ReplicationManager implements ActiveMQComponent {
public final class ReplicationManager implements ActiveMQComponent, ReadyListener {
public enum ADD_OPERATION_TYPE {
UPDATE {
@ -109,6 +111,7 @@ public final class ReplicationManager implements ActiveMQComponent {
private final Object replicationLock = new Object();
private final ReusableLatch latch = new ReusableLatch();
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();
private final ExecutorFactory executorFactory;
@ -260,6 +263,8 @@ public final class ReplicationManager implements ActiveMQComponent {
if (!started) {
return;
}
replicatingChannel.getConnection().getTransportConnection().fireReady(true);
latch.setCount(0);
synchronized (replicationLock) {
enabled = false;
@ -273,6 +278,7 @@ public final class ReplicationManager implements ActiveMQComponent {
if (toStop != null) {
toStop.removeFailureListener(failureListener);
}
remotingConnection = null;
started = false;
}
@ -332,6 +338,15 @@ public final class ReplicationManager implements ActiveMQComponent {
synchronized (replicationLock) {
if (enabled) {
pendingTokens.add(repliToken);
if (!replicatingChannel.getConnection().isWritable(this)) {
latch.countUp();
try {
latch.await();
}
catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
}
replicatingChannel.send(packet);
}
else {
@ -349,6 +364,11 @@ public final class ReplicationManager implements ActiveMQComponent {
return repliToken;
}
@Override
public void readyForWriting() {
latch.countDown();
}
/**
* @throws IllegalStateException By default, all replicated packets generate a replicated
* response. If your packets are triggering this exception, it may be because the