From 246d11c6b19b8210f6da96cd6f71ad64b0862d50 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 6 Jul 2016 17:35:23 -0400 Subject: [PATCH] ARTEMIS-616 Use Call timeout on replication flow control --- .../artemis/core/replication/ReplicationManager.java | 7 +++++-- .../artemis/core/server/cluster/ClusterConnection.java | 4 ++++ .../core/server/cluster/impl/ClusterConnectionImpl.java | 5 +++++ .../core/server/impl/SharedNothingLiveActivation.java | 2 +- .../tests/integration/replication/ReplicationTest.java | 2 +- 5 files changed, 16 insertions(+), 4 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 58102d4438..b254d9ac54 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -126,6 +126,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene private CoreRemotingConnection remotingConnection; + private final long timeout; + private volatile boolean inSync = true; private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0); @@ -133,10 +135,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene /** * @param remotingConnection */ - public ReplicationManager(CoreRemotingConnection remotingConnection, final ExecutorFactory executorFactory) { + public ReplicationManager(CoreRemotingConnection remotingConnection, final long timeout, final ExecutorFactory executorFactory) { this.executorFactory = executorFactory; this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1); this.remotingConnection = remotingConnection; + this.timeout = timeout; } public void appendUpdateRecord(final byte journalID, @@ -384,7 +387,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene writable.set(false); //don't wait for ever as this may hang tests etc, we've probably been closed anyway long now = System.currentTimeMillis(); - long deadline = now + 5000; + long deadline = now + timeout; while (!writable.get() && now < deadline) { replicationLock.wait(deadline - now); now = System.currentTimeMillis(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java index 7134bd3e8e..c47ff48b0b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java @@ -77,4 +77,8 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis void removeRecord(String targetNodeID); void disconnectRecord(String targetNodeID); + + long getCallTimeout(); + + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 800ed5ab5c..77f25e9e8a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -553,6 +553,11 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn return rec.getBridge().isConnected(); } + @Override + public long getCallTimeout() { + return callTimeout; + } + @Override public Map getNodes() { synchronized (recordsGuard) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index 6b222fb9bf..f17bcc5693 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -162,7 +162,7 @@ public class SharedNothingLiveActivation extends LiveActivation { ReplicationFailureListener listener = new ReplicationFailureListener(); rc.addCloseListener(listener); rc.addFailureListener(listener); - replicationManager = new ReplicationManager(rc, activeMQServer.getExecutorFactory()); + replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), activeMQServer.getExecutorFactory()); replicationManager.start(); Thread t = new Thread(new Runnable() { @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 4a00caabb2..6ff0cf041e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -191,7 +191,7 @@ public final class ReplicationTest extends ActiveMQTestBase { setupServer(false); try { ClientSessionFactory sf = createSessionFactory(locator); - manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), factory); + manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), factory); addActiveMQComponent(manager); manager.start(); Assert.fail("Exception was expected");