From 5a48ad661dc369636c75e90758ddf8b97a621c90 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 25 Aug 2016 14:34:11 -0400 Subject: [PATCH] Address race condition in HTTP pipeline tests The Netty 4 HTTP server pipeline tests contains two different test cases. The general idea behind these tests is to submit some requests to a Netty 4 HTTP server, one test with pipelining enabled and another test with pipelining disabled. These requests are submitted to two endpoints, one with a path like /{id} and another with a path like /slow with a query string parameter sleep. This parameter tells the request handler how long to sleep for before replying. The idea is that in the case of the pipelining enabled tests, the requests should come back exactly in the order submitted, even with some of the requests hitting the slow endpoint with random sleep durations; this is the guarantee that pipelining provides. And in the case of the pipelining disabled tests, requests were randombly submitted to /{id} and /slow with sleep parameters starting at 600ms and increasing by 100ms for each slow request constructed. We would expect the requests to come back with the all the responses to the /{id} requests first because these requests will execute instantaneously, and then the responses to the /slow requests. Further, it was expected that the slow requests would come back ordered by the length of the sleep, the thinking being that 100ms should be enough of a difference between each request that we would avoid any race conditions. Sadly, this is not the case, the threads do sometimes hit race conditions. This commit modifies the HTTP server pipelining tests to address this race condition. The modification is that the query string parameter on the /slow endpoint is removed in favor of just submitting requests to the path /slow/{id}, where id just used a marker to distinguish each request. The server chooses a random sleep of at least 500ms for each request on the slow path. The assertion here then is that the /{id} responses arrive first, then then /slow responses. We can not make an assertion on the order of the responses, but we can assert that we did see every expected response. Relates #19845 --- .../Netty4HttpServerPipeliningTests.java | 56 +++++++++++-------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java index 155bbe4bb5b..63e35a786c2 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java @@ -25,14 +25,12 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.QueryStringDecoder; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -51,10 +49,13 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.hasSize; @@ -95,7 +96,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase { final List requests = new ArrayList<>(numberOfRequests); for (int i = 0; i < numberOfRequests; i++) { if (rarely()) { - requests.add("/slow?sleep=" + scaledRandomIntBetween(500, 1000)); + requests.add("/slow/" + i); } else { requests.add("/" + i); } @@ -120,32 +121,41 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase { (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); final int numberOfRequests = randomIntBetween(4, 16); - final int numberOfSlowRequests = scaledRandomIntBetween(1, numberOfRequests); + final Set slowIds = new HashSet<>(); final List requests = new ArrayList<>(numberOfRequests); - for (int i = 0; i < numberOfRequests - numberOfSlowRequests; i++) { - requests.add("/" + i); - } - for (int i = 0; i < numberOfSlowRequests; i++) { - requests.add("/slow?sleep=" + sleep(i)); + int numberOfSlowRequests = 0; + for (int i = 0; i < numberOfRequests; i++) { + if (rarely()) { + requests.add("/slow/" + i); + slowIds.add(i); + numberOfSlowRequests++; + } else { + requests.add("/" + i); + } } try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) { Collection responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{})); List responseBodies = new ArrayList<>(Netty4HttpClient.returnHttpResponseBodies(responses)); - // we cannot be sure about the order of the fast requests, but the slow ones should have to be last + // we can not be sure about the order of the responses, but the slow ones should + // come last assertThat(responseBodies, hasSize(numberOfRequests)); - for (int i = 0; i < numberOfSlowRequests; i++) { - assertThat(responseBodies.get(numberOfRequests - numberOfSlowRequests + i), equalTo("/slow?sleep=" + sleep(i))); + for (int i = 0; i < numberOfRequests - numberOfSlowRequests; i++) { + assertThat(responseBodies.get(i), matches("/\\d+")); } + + final Set ids = new HashSet<>(); + for (int i = 0; i < numberOfSlowRequests; i++) { + final String response = responseBodies.get(numberOfRequests - numberOfSlowRequests + i); + assertThat(response, matches("/slow/\\d+" )); + assertTrue(ids.add(Integer.parseInt(response.split("/")[2]))); + } + + assertThat(slowIds, equalTo(ids)); } } } - - private int sleep(int index) { - return 500 + 100 * (index + 1); - } - class CustomNettyHttpServerTransport extends Netty4HttpServerTransport { private final ExecutorService executorService = Executors.newCachedThreadPool(); @@ -237,17 +247,15 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase { final DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buffer); httpResponse.headers().add(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes()); - final QueryStringDecoder decoder = new QueryStringDecoder(uri); - - final int timeout = - uri.startsWith("/slow") && decoder.parameters().containsKey("sleep") ? - Integer.valueOf(decoder.parameters().get("sleep").get(0)) : 0; - if (timeout > 0) { + final boolean slow = uri.matches("/slow/\\d+"); + if (slow) { try { - Thread.sleep(timeout); + Thread.sleep(scaledRandomIntBetween(500, 1000)); } catch (InterruptedException e) { throw new RuntimeException(e); } + } else { + assert uri.matches("/\\d+"); } if (pipelinedRequest != null) {