Stop delaying existing requests after network delay rule is cleared (#21129)

The network disruption type "network delay" continues delaying existing requests even after the disruption has been cleared. This commit ensures that the requests get to execute right after the delay rule is cleared.
This commit is contained in:
Yannick Welsch 2016-10-27 13:48:17 +02:00 committed by GitHub
parent 952097b1c0
commit f3e578f942
3 changed files with 75 additions and 9 deletions

View File

@ -153,11 +153,25 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
return 1;
}
private boolean disableBeforeIndexDeletion;
@Override
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
if (scheme instanceof NetworkDisruption &&
((NetworkDisruption) scheme).getNetworkLinkDisruptionType() instanceof NetworkUnresponsive) {
// the network unresponsive disruption may leave operations in flight
// this is because this disruption scheme swallows requests by design
// as such, these operations will never be marked as finished
disableBeforeIndexDeletion = true;
}
super.setDisruptionScheme(scheme);
}
@Override
protected void beforeIndexDeletion() {
// some test may leave operations in flight
// this is because the disruption schemes swallow requests by design
// as such, these operations will never be marked as finished
if (disableBeforeIndexDeletion == false) {
super.beforeIndexDeletion();
}
}
private List<String> startCluster(int numberOfNodes) throws ExecutionException, InterruptedException {

View File

@ -58,6 +58,14 @@ public class NetworkDisruption implements ServiceDisruptionScheme {
this.networkLinkDisruptionType = networkLinkDisruptionType;
}
public DisruptedLinks getDisruptedLinks() {
return disruptedLinks;
}
public NetworkLinkDisruptionType getNetworkLinkDisruptionType() {
return networkLinkDisruptionType;
}
@Override
public void applyToCluster(InternalTestCluster cluster) {
this.cluster = cluster;
@ -143,6 +151,11 @@ public class NetworkDisruption implements ServiceDisruptionScheme {
return (MockTransportService) cluster.getInstance(TransportService.class, node);
}
@Override
public String toString() {
return "network disruption (disruption type: " + networkLinkDisruptionType + ", disrupted links: " + disruptedLinks + ")";
}
/**
* Represents a set of nodes with connections between nodes that are to be disrupted
*/

View File

@ -58,9 +58,12 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A mock transport service that allows to simulate different network topology failures.
@ -95,7 +98,7 @@ public final class MockTransportService extends TransportService {
/**
* Build the service.
*
*
* @param clusterSettings if non null the the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
* updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}.
*/
@ -142,7 +145,10 @@ public final class MockTransportService extends TransportService {
* Clears the rule associated with the provided transport address.
*/
public void clearRule(TransportAddress transportAddress) {
transport().transports.remove(transportAddress);
Transport transport = transport().transports.remove(transportAddress);
if (transport instanceof ClearableTransport) {
((ClearableTransport) transport).clearRule();
}
}
/**
@ -292,7 +298,8 @@ public final class MockTransportService extends TransportService {
public void addUnresponsiveRule(TransportAddress transportAddress, final TimeValue duration) {
final long startTime = System.currentTimeMillis();
addDelegate(transportAddress, new DelegateTransport(original) {
addDelegate(transportAddress, new ClearableTransport(original) {
private final Queue<Runnable> requestsToSendWhenCleared = new ConcurrentLinkedQueue<>();
TimeValue getDelay() {
return new TimeValue(duration.millis() - (System.currentTimeMillis() - startTime));
@ -362,7 +369,9 @@ public final class MockTransportService extends TransportService {
final TransportRequest clonedRequest = reg.newRequest();
clonedRequest.readFrom(bStream.bytes().streamInput());
threadPool.schedule(delay, ThreadPool.Names.GENERIC, new AbstractRunnable() {
Runnable runnable = new AbstractRunnable() {
AtomicBoolean requestSent = new AtomicBoolean();
@Override
public void onFailure(Exception e) {
logger.debug("failed to send delayed request", e);
@ -370,9 +379,22 @@ public final class MockTransportService extends TransportService {
@Override
protected void doRun() throws IOException {
original.sendRequest(node, requestId, action, clonedRequest, options);
if (requestSent.compareAndSet(false, true)) {
original.sendRequest(node, requestId, action, clonedRequest, options);
}
}
});
};
// store the request to send it once the rule is cleared.
requestsToSendWhenCleared.add(runnable);
threadPool.schedule(delay, ThreadPool.Names.GENERIC, runnable);
}
@Override
public void clearRule() {
requestsToSendWhenCleared.forEach(Runnable::run);
}
});
}
@ -555,6 +577,23 @@ public final class MockTransportService extends TransportService {
}
}
/**
* The delegate transport instances defined in this class mock various kinds of disruption types. This subclass adds a method
* {@link #clearRule()} so that when the disruptions are cleared (see {@link #clearRule(TransportService)}) this gives the
* disruption a possibility to run clean-up actions.
*/
public abstract static class ClearableTransport extends DelegateTransport {
public ClearableTransport(Transport transport) {
super(transport);
}
/**
* Called by {@link #clearRule(TransportService)}
*/
public abstract void clearRule();
}
List<Tracer> activeTracers = new CopyOnWriteArrayList<>();