ARTEMIS-616 Use Call timeout on replication flow control

This commit is contained in:
Clebert Suconic 2016-07-06 17:35:23 -04:00
parent 0ab88e6096
commit 246d11c6b1
5 changed files with 16 additions and 4 deletions

View File

@ -126,6 +126,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
private CoreRemotingConnection remotingConnection;
private final long timeout;
private volatile boolean inSync = true;
private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);
@ -133,10 +135,11 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
/**
* @param remotingConnection
*/
public ReplicationManager(CoreRemotingConnection remotingConnection, final ExecutorFactory executorFactory) {
public ReplicationManager(CoreRemotingConnection remotingConnection, final long timeout, final ExecutorFactory executorFactory) {
this.executorFactory = executorFactory;
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
this.remotingConnection = remotingConnection;
this.timeout = timeout;
}
public void appendUpdateRecord(final byte journalID,
@ -384,7 +387,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene
writable.set(false);
//don't wait for ever as this may hang tests etc, we've probably been closed anyway
long now = System.currentTimeMillis();
long deadline = now + 5000;
long deadline = now + timeout;
while (!writable.get() && now < deadline) {
replicationLock.wait(deadline - now);
now = System.currentTimeMillis();

View File

@ -77,4 +77,8 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis
void removeRecord(String targetNodeID);
void disconnectRecord(String targetNodeID);
long getCallTimeout();
}

View File

@ -553,6 +553,11 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
return rec.getBridge().isConnected();
}
@Override
public long getCallTimeout() {
return callTimeout;
}
@Override
public Map<String, String> getNodes() {
synchronized (recordsGuard) {

View File

@ -162,7 +162,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
ReplicationFailureListener listener = new ReplicationFailureListener();
rc.addCloseListener(listener);
rc.addFailureListener(listener);
replicationManager = new ReplicationManager(rc, activeMQServer.getExecutorFactory());
replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), activeMQServer.getExecutorFactory());
replicationManager.start();
Thread t = new Thread(new Runnable() {
@Override

View File

@ -191,7 +191,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
setupServer(false);
try {
ClientSessionFactory sf = createSessionFactory(locator);
manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), factory);
manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), factory);
addActiveMQComponent(manager);
manager.start();
Assert.fail("Exception was expected");