[Tests] Fixed some issues with SlowClusterStateProcessing

Reduced expected time to heal to 0 (we interrupt and wait on stop disruption). It was also  wrongly indicated in seconds.
We didn't properly wait between slow cluster state tasks
This commit is contained in:
Boaz Leskes 2014-07-20 21:00:25 +03:00
parent c2142c0f6d
commit a40984887b
1 changed files with 26 additions and 11 deletions

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class SlowClusterStateProcessing extends SingleNodeDisruption { public class SlowClusterStateProcessing extends SingleNodeDisruption {
@ -75,7 +76,9 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
if (worker == null) { if (worker == null) {
return; return;
} }
logger.info("stopping to slow down cluster state processing on [{}]", disruptedNode);
disrupting = false; disrupting = false;
worker.interrupt();
try { try {
worker.join(2 * (intervalBetweenDelaysMax + delayDurationMax)); worker.join(2 * (intervalBetweenDelaysMax + delayDurationMax));
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -85,49 +88,61 @@ public class SlowClusterStateProcessing extends SingleNodeDisruption {
} }
private synchronized boolean interruptClusterStateProcessing(final TimeValue duration) { private boolean interruptClusterStateProcessing(final TimeValue duration) throws InterruptedException {
if (disruptedNode == null) { final String disruptionNodeCopy = disruptedNode;
if (disruptionNodeCopy == null) {
return false;
}
logger.info("delaying cluster state updates on node [{}] for [{}]", disruptionNodeCopy, duration);
final CountDownLatch countDownLatch = new CountDownLatch(1);
ClusterService clusterService = cluster.getInstance(ClusterService.class, disruptionNodeCopy);
if (clusterService == null) {
return false; return false;
} }
logger.info("delaying cluster state updates on node [{}] for [{}]", disruptedNode, duration);
ClusterService clusterService = cluster.getInstance(ClusterService.class, disruptedNode);
clusterService.submitStateUpdateTask("service_disruption_delay", Priority.IMMEDIATE, new ClusterStateNonMasterUpdateTask() { clusterService.submitStateUpdateTask("service_disruption_delay", Priority.IMMEDIATE, new ClusterStateNonMasterUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
Thread.sleep(duration.millis()); Thread.sleep(duration.millis());
countDownLatch.countDown();
return currentState; return currentState;
} }
@Override @Override
public void onFailure(String source, Throwable t) { public void onFailure(String source, Throwable t) {
countDownLatch.countDown();
} }
}); });
try {
countDownLatch.await();
} catch (InterruptedException e) {
// try to wait again, we really want the cluster state thread to be freed up when stopping disruption
countDownLatch.await();
}
return true; return true;
} }
@Override @Override
public TimeValue expectedTimeToHeal() { public TimeValue expectedTimeToHeal() {
return TimeValue.timeValueSeconds(delayDurationMax + intervalBetweenDelaysMax); return TimeValue.timeValueMillis(0);
} }
class BackgroundWorker implements Runnable { class BackgroundWorker implements Runnable {
@Override @Override
public void run() { public void run() {
while (disrupting) { while (disrupting && disruptedNode != null) {
try { try {
TimeValue duration = new TimeValue(delayDurationMin + random.nextInt((int) (delayDurationMax - delayDurationMin))); TimeValue duration = new TimeValue(delayDurationMin + random.nextInt((int) (delayDurationMax - delayDurationMin)));
if (!interruptClusterStateProcessing(duration)) { if (!interruptClusterStateProcessing(duration)) {
continue; continue;
} }
duration = new TimeValue(intervalBetweenDelaysMin + random.nextInt((int) (intervalBetweenDelaysMax - intervalBetweenDelaysMin)));
if (disrupting && disruptedNode != null) {
Thread.sleep(duration.millis()); Thread.sleep(duration.millis());
if (disruptedNode == null) {
return;
} }
} catch (InterruptedException e) {
} catch (Exception e) { } catch (Exception e) {
logger.error("error in background worker", e); logger.error("error in background worker", e);
} }