From 1e7985563abca433b3cbde084131d42045222cc3 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Tue, 13 Dec 2016 19:34:33 +0100 Subject: [PATCH] Tests: Make MockWebServer thread-safe (elastic/elasticsearch#4351) The new MockWebServer assumed that responses are inserted at the beginning and removed later on. This was not thread safe. Also this fixes a bug in the HttpExporterIT where there was no wait time for a bulk request, even though the request execution is asynchronous. Closes elastic/elasticsearch#4335 Original commit: elastic/x-pack-elasticsearch@11f31f68bdbec08e975a879bab429408e06c301b --- .../test/http/MockWebServer.java | 53 ++++++++++++++----- .../exporter/http/HttpExporterIT.java | 9 ++-- 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/elasticsearch/src/test/java/org/elasticsearch/test/http/MockWebServer.java b/elasticsearch/src/test/java/org/elasticsearch/test/http/MockWebServer.java index 4bed6c5a7f2..ce6239061e3 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/test/http/MockWebServer.java +++ b/elasticsearch/src/test/java/org/elasticsearch/test/http/MockWebServer.java @@ -30,11 +30,11 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Queue; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.test.ESTestCase.terminate; @@ -49,13 +49,12 @@ import static org.elasticsearch.test.ESTestCase.terminate; public class MockWebServer implements Closeable { private HttpServer server; - private final AtomicInteger index = new AtomicInteger(0); - private final List responses = new ArrayList<>(); - private final List requests = new ArrayList<>(); + private final Queue responses = ConcurrentCollections.newQueue(); + private final Queue requests = ConcurrentCollections.newQueue(); private final Logger logger; private final SSLContext sslContext; - private boolean needClientAuth; - private Set latches = ConcurrentCollections.newConcurrentSet(); + private final boolean needClientAuth; + private final Set latches = ConcurrentCollections.newConcurrentSet(); /** * Instantiates a webserver without https @@ -93,13 +92,16 @@ public class MockWebServer implements Closeable { server.start(); server.createContext("/", s -> { - logger.debug("incoming HTTP request [{} {}]", s.getRequestMethod(), s.getRequestURI()); - try { - MockResponse response = responses.get(index.getAndAdd(1)); + MockResponse response = responses.poll(); MockRequest request = createRequest(s); requests.add(request); + if (logger.isDebugEnabled()) { + logger.debug("[{}:{}] incoming HTTP request [{} {}], returning status [{}] body [{}]", getHostName(), getPort(), + s.getRequestMethod(), s.getRequestURI(), response.getStatusCode(), getStartOfBody(response)); + } + sleepIfNeeded(response.getBeforeReplyDelay()); s.getResponseHeaders().putAll(response.getHeaders().headers); @@ -196,6 +198,10 @@ public class MockWebServer implements Closeable { * @param response The created mock response */ public void enqueue(MockResponse response) { + if (logger.isTraceEnabled()) { + logger.trace("[{}:{}] Enqueueing response [{}], status [{}] body [{}]", getHostName(), getPort(), responses.size(), + response.getStatusCode(), getStartOfBody(response)); + } responses.add(response); } @@ -203,15 +209,23 @@ public class MockWebServer implements Closeable { * @return The requests that have been made to this mock web server */ public List requests() { - return requests; + return new ArrayList<>(requests); } /** * Removes the first request in the list of requests and returns it to the caller. - * This can be used as a queue if you know the order of your requests deone. + * This can be used as a queue if you are sure the order of your requests. */ public MockRequest takeRequest() { - return requests.remove(0); + return requests.poll(); + } + + /** + * A utility method to peek into the requests and find out if #MockWebServer.takeRequests will not throw an out of bound exception + * @return true if more requests are available, false otherwise + */ + public boolean hasMoreRequests() { + return requests.isEmpty() == false; } /** @@ -220,7 +234,7 @@ public class MockWebServer implements Closeable { */ @Override public void close() { - logger.debug("Counting down all latches before terminating executor"); + logger.debug("[{}:{}] Counting down all latches before terminating executor", getHostName(), getPort()); latches.forEach(CountDownLatch::countDown); if (server.getExecutor() instanceof ExecutorService) { @@ -231,4 +245,17 @@ public class MockWebServer implements Closeable { } server.stop(0); } + + /** + * Helper method to return the first 20 chars of a request's body + * @param response The MockResponse to inspect + * @return Returns the first 20 chars or an empty string if the response body is not configured + */ + private String getStartOfBody(MockResponse response) { + if (Strings.isEmpty(response.getBody())) { + return ""; + } + int length = Math.min(20, response.getBody().length()); + return response.getBody().substring(0, length).replaceAll("\n", ""); + } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterIT.java index 9316d863cfb..216c073ffdc 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterIT.java @@ -62,6 +62,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.startsWith; @@ -229,10 +230,10 @@ public class HttpExporterIT extends MonitoringIntegTestCase { // pretend that one of the templates is missing for (Tuple template : monitoringTemplates()) { if (template.v1().contains(MonitoringBulkTimestampedResolver.Data.DATA)) { - enqueueResponse(secondWebServer, 200, "template [" + template + "] exists"); + enqueueResponse(secondWebServer, 200, "template [" + template.v1() + "] exists"); } else { - enqueueResponse(secondWebServer, 404, "template [" + template + "] does not exist"); - enqueueResponse(secondWebServer, 201, "template [" + template + "] created"); + enqueueResponse(secondWebServer, 404, "template [" + template.v1() + "] does not exist"); + enqueueResponse(secondWebServer, 201, "template [" + template.v1() + "] created"); } } // opposite of if it existed before @@ -450,6 +451,8 @@ public class HttpExporterIT extends MonitoringIntegTestCase { @Nullable final Map customHeaders, @Nullable final String basePath) throws Exception { final String pathPrefix = basePathToAssertablePrefix(basePath); + // the bulk request is fired off asynchronously so we might need to take a while, until we can get the request from the webserver + assertBusy(() -> assertThat("Waiting for further requests in web server", webServer.hasMoreRequests(), is(true))); final MockRequest request = webServer.takeRequest(); assertThat(request.getMethod(), equalTo("POST"));