From f3e578f9428ba91dd44aedc86e1fe23510fb7446 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 27 Oct 2016 13:48:17 +0200 Subject: [PATCH] Stop delaying existing requests after network delay rule is cleared (#21129) The network disruption type "network delay" continues delaying existing requests even after the disruption has been cleared. This commit ensures that the requests get to execute right after the delay rule is cleared. --- .../DiscoveryWithServiceDisruptionsIT.java | 20 ++++++-- .../test/disruption/NetworkDisruption.java | 13 +++++ .../test/transport/MockTransportService.java | 51 ++++++++++++++++--- 3 files changed, 75 insertions(+), 9 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 2ea675ab3f7..2ca9f6f3384 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -153,11 +153,25 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { return 1; } + private boolean disableBeforeIndexDeletion; + + @Override + public void setDisruptionScheme(ServiceDisruptionScheme scheme) { + if (scheme instanceof NetworkDisruption && + ((NetworkDisruption) scheme).getNetworkLinkDisruptionType() instanceof NetworkUnresponsive) { + // the network unresponsive disruption may leave operations in flight + // this is because this disruption scheme swallows requests by design + // as such, these operations will never be marked as finished + disableBeforeIndexDeletion = true; + } + super.setDisruptionScheme(scheme); + } + @Override protected void beforeIndexDeletion() { - // some test may leave operations in flight - // this is because the disruption schemes swallow requests by design - // as such, these operations will never be marked as finished + if (disableBeforeIndexDeletion == false) { + super.beforeIndexDeletion(); + } } private List startCluster(int numberOfNodes) throws ExecutionException, InterruptedException { diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java index f7094d8ae9f..de57eee6937 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java @@ -58,6 +58,14 @@ public class NetworkDisruption implements ServiceDisruptionScheme { this.networkLinkDisruptionType = networkLinkDisruptionType; } + public DisruptedLinks getDisruptedLinks() { + return disruptedLinks; + } + + public NetworkLinkDisruptionType getNetworkLinkDisruptionType() { + return networkLinkDisruptionType; + } + @Override public void applyToCluster(InternalTestCluster cluster) { this.cluster = cluster; @@ -143,6 +151,11 @@ public class NetworkDisruption implements ServiceDisruptionScheme { return (MockTransportService) cluster.getInstance(TransportService.class, node); } + @Override + public String toString() { + return "network disruption (disruption type: " + networkLinkDisruptionType + ", disrupted links: " + disruptedLinks + ")"; + } + /** * Represents a set of nodes with connections between nodes that are to be disrupted */ diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index cef12bc930a..c63f968011e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -58,9 +58,12 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; /** * A mock transport service that allows to simulate different network topology failures. @@ -95,7 +98,7 @@ public final class MockTransportService extends TransportService { /** * Build the service. - * + * * @param clusterSettings if non null the the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings * updates for {@link #TRACE_LOG_EXCLUDE_SETTING} and {@link #TRACE_LOG_INCLUDE_SETTING}. */ @@ -142,7 +145,10 @@ public final class MockTransportService extends TransportService { * Clears the rule associated with the provided transport address. */ public void clearRule(TransportAddress transportAddress) { - transport().transports.remove(transportAddress); + Transport transport = transport().transports.remove(transportAddress); + if (transport instanceof ClearableTransport) { + ((ClearableTransport) transport).clearRule(); + } } /** @@ -292,7 +298,8 @@ public final class MockTransportService extends TransportService { public void addUnresponsiveRule(TransportAddress transportAddress, final TimeValue duration) { final long startTime = System.currentTimeMillis(); - addDelegate(transportAddress, new DelegateTransport(original) { + addDelegate(transportAddress, new ClearableTransport(original) { + private final Queue requestsToSendWhenCleared = new ConcurrentLinkedQueue<>(); TimeValue getDelay() { return new TimeValue(duration.millis() - (System.currentTimeMillis() - startTime)); @@ -362,7 +369,9 @@ public final class MockTransportService extends TransportService { final TransportRequest clonedRequest = reg.newRequest(); clonedRequest.readFrom(bStream.bytes().streamInput()); - threadPool.schedule(delay, ThreadPool.Names.GENERIC, new AbstractRunnable() { + Runnable runnable = new AbstractRunnable() { + AtomicBoolean requestSent = new AtomicBoolean(); + @Override public void onFailure(Exception e) { logger.debug("failed to send delayed request", e); @@ -370,9 +379,22 @@ public final class MockTransportService extends TransportService { @Override protected void doRun() throws IOException { - original.sendRequest(node, requestId, action, clonedRequest, options); + if (requestSent.compareAndSet(false, true)) { + original.sendRequest(node, requestId, action, clonedRequest, options); + } } - }); + }; + + // store the request to send it once the rule is cleared. + requestsToSendWhenCleared.add(runnable); + + threadPool.schedule(delay, ThreadPool.Names.GENERIC, runnable); + } + + + @Override + public void clearRule() { + requestsToSendWhenCleared.forEach(Runnable::run); } }); } @@ -555,6 +577,23 @@ public final class MockTransportService extends TransportService { } } + /** + * The delegate transport instances defined in this class mock various kinds of disruption types. This subclass adds a method + * {@link #clearRule()} so that when the disruptions are cleared (see {@link #clearRule(TransportService)}) this gives the + * disruption a possibility to run clean-up actions. + */ + public abstract static class ClearableTransport extends DelegateTransport { + + public ClearableTransport(Transport transport) { + super(transport); + } + + /** + * Called by {@link #clearRule(TransportService)} + */ + public abstract void clearRule(); + } + List activeTracers = new CopyOnWriteArrayList<>();