Address race condition in HTTP pipeline tests

The Netty 4 HTTP server pipeline tests contains two different test
cases. The general idea behind these tests is to submit some requests to
a Netty 4 HTTP server, one test with pipelining enabled and another test
with pipelining disabled. These requests are submitted to two endpoints,
one with a path like /{id} and another with a path like /slow with a
query string parameter sleep. This parameter tells the request handler
how long to sleep for before replying. The idea is that in the case of
the pipelining enabled tests, the requests should come back exactly in
the order submitted, even with some of the requests hitting the slow
endpoint with random sleep durations; this is the guarantee that
pipelining provides. And in the case of the pipelining disabled tests,
requests were randombly submitted to /{id} and /slow with sleep
parameters starting at 600ms and increasing by 100ms for each slow
request constructed. We would expect the requests to come back with the
all the responses to the /{id} requests first because these requests
will execute instantaneously, and then the responses to the /slow
requests. Further, it was expected that the slow requests would come
back ordered by the length of the sleep, the thinking being that 100ms
should be enough of a difference between each request that we would
avoid any race conditions. Sadly, this is not the case, the threads do
sometimes hit race conditions.

This commit modifies the HTTP server pipelining tests to address this
race condition. The modification is that the query string parameter on
the /slow endpoint is removed in favor of just submitting requests to
the path /slow/{id}, where id just used a marker to distinguish each
request. The server chooses a random sleep of at least 500ms for each
request on the slow path. The assertion here then is that the /{id}
responses arrive first, then then /slow responses. We can not make an
assertion on the order of the responses, but we can assert that we did
see every expected response.

Relates #19845
This commit is contained in:
Jason Tedor 2016-08-25 14:34:11 -04:00 committed by GitHub
parent 139d3f957f
commit 5a48ad661d
1 changed files with 32 additions and 24 deletions

View File

@ -25,14 +25,12 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -51,10 +49,13 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
@ -95,7 +96,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
final List<String> requests = new ArrayList<>(numberOfRequests); final List<String> requests = new ArrayList<>(numberOfRequests);
for (int i = 0; i < numberOfRequests; i++) { for (int i = 0; i < numberOfRequests; i++) {
if (rarely()) { if (rarely()) {
requests.add("/slow?sleep=" + scaledRandomIntBetween(500, 1000)); requests.add("/slow/" + i);
} else { } else {
requests.add("/" + i); requests.add("/" + i);
} }
@ -120,30 +121,39 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
(InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
final int numberOfRequests = randomIntBetween(4, 16); final int numberOfRequests = randomIntBetween(4, 16);
final int numberOfSlowRequests = scaledRandomIntBetween(1, numberOfRequests); final Set<Integer> slowIds = new HashSet<>();
final List<String> requests = new ArrayList<>(numberOfRequests); final List<String> requests = new ArrayList<>(numberOfRequests);
for (int i = 0; i < numberOfRequests - numberOfSlowRequests; i++) { int numberOfSlowRequests = 0;
for (int i = 0; i < numberOfRequests; i++) {
if (rarely()) {
requests.add("/slow/" + i);
slowIds.add(i);
numberOfSlowRequests++;
} else {
requests.add("/" + i); requests.add("/" + i);
} }
for (int i = 0; i < numberOfSlowRequests; i++) {
requests.add("/slow?sleep=" + sleep(i));
} }
try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) { try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{})); Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{}));
List<String> responseBodies = new ArrayList<>(Netty4HttpClient.returnHttpResponseBodies(responses)); List<String> responseBodies = new ArrayList<>(Netty4HttpClient.returnHttpResponseBodies(responses));
// we cannot be sure about the order of the fast requests, but the slow ones should have to be last // we can not be sure about the order of the responses, but the slow ones should
// come last
assertThat(responseBodies, hasSize(numberOfRequests)); assertThat(responseBodies, hasSize(numberOfRequests));
for (int i = 0; i < numberOfRequests - numberOfSlowRequests; i++) {
assertThat(responseBodies.get(i), matches("/\\d+"));
}
final Set<Integer> ids = new HashSet<>();
for (int i = 0; i < numberOfSlowRequests; i++) { for (int i = 0; i < numberOfSlowRequests; i++) {
assertThat(responseBodies.get(numberOfRequests - numberOfSlowRequests + i), equalTo("/slow?sleep=" + sleep(i))); final String response = responseBodies.get(numberOfRequests - numberOfSlowRequests + i);
} assertThat(response, matches("/slow/\\d+" ));
} assertTrue(ids.add(Integer.parseInt(response.split("/")[2])));
}
} }
assertThat(slowIds, equalTo(ids));
private int sleep(int index) { }
return 500 + 100 * (index + 1); }
} }
class CustomNettyHttpServerTransport extends Netty4HttpServerTransport { class CustomNettyHttpServerTransport extends Netty4HttpServerTransport {
@ -237,17 +247,15 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
final DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buffer); final DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buffer);
httpResponse.headers().add(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes()); httpResponse.headers().add(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes());
final QueryStringDecoder decoder = new QueryStringDecoder(uri); final boolean slow = uri.matches("/slow/\\d+");
if (slow) {
final int timeout =
uri.startsWith("/slow") && decoder.parameters().containsKey("sleep") ?
Integer.valueOf(decoder.parameters().get("sleep").get(0)) : 0;
if (timeout > 0) {
try { try {
Thread.sleep(timeout); Thread.sleep(scaledRandomIntBetween(500, 1000));
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} else {
assert uri.matches("/\\d+");
} }
if (pipelinedRequest != null) { if (pipelinedRequest != null) {