Tests: Make MockWebServer thread-safe (elastic/elasticsearch#4351)
The new MockWebServer assumed that responses are inserted at the beginning and removed later on. This was not thread safe. Also this fixes a bug in the HttpExporterIT where there was no wait time for a bulk request, even though the request execution is asynchronous. Closes elastic/elasticsearch#4335 Original commit: elastic/x-pack-elasticsearch@11f31f68bd
This commit is contained in:
parent
1c846dd893
commit
1e7985563a
|
@ -30,11 +30,11 @@ import java.net.InetAddress;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.elasticsearch.test.ESTestCase.terminate;
|
||||
|
||||
|
@ -49,13 +49,12 @@ import static org.elasticsearch.test.ESTestCase.terminate;
|
|||
public class MockWebServer implements Closeable {
|
||||
|
||||
private HttpServer server;
|
||||
private final AtomicInteger index = new AtomicInteger(0);
|
||||
private final List<MockResponse> responses = new ArrayList<>();
|
||||
private final List<MockRequest> requests = new ArrayList<>();
|
||||
private final Queue<MockResponse> responses = ConcurrentCollections.newQueue();
|
||||
private final Queue<MockRequest> requests = ConcurrentCollections.newQueue();
|
||||
private final Logger logger;
|
||||
private final SSLContext sslContext;
|
||||
private boolean needClientAuth;
|
||||
private Set<CountDownLatch> latches = ConcurrentCollections.newConcurrentSet();
|
||||
private final boolean needClientAuth;
|
||||
private final Set<CountDownLatch> latches = ConcurrentCollections.newConcurrentSet();
|
||||
|
||||
/**
|
||||
* Instantiates a webserver without https
|
||||
|
@ -93,13 +92,16 @@ public class MockWebServer implements Closeable {
|
|||
|
||||
server.start();
|
||||
server.createContext("/", s -> {
|
||||
logger.debug("incoming HTTP request [{} {}]", s.getRequestMethod(), s.getRequestURI());
|
||||
|
||||
try {
|
||||
MockResponse response = responses.get(index.getAndAdd(1));
|
||||
MockResponse response = responses.poll();
|
||||
MockRequest request = createRequest(s);
|
||||
requests.add(request);
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("[{}:{}] incoming HTTP request [{} {}], returning status [{}] body [{}]", getHostName(), getPort(),
|
||||
s.getRequestMethod(), s.getRequestURI(), response.getStatusCode(), getStartOfBody(response));
|
||||
}
|
||||
|
||||
sleepIfNeeded(response.getBeforeReplyDelay());
|
||||
|
||||
s.getResponseHeaders().putAll(response.getHeaders().headers);
|
||||
|
@ -196,6 +198,10 @@ public class MockWebServer implements Closeable {
|
|||
* @param response The created mock response
|
||||
*/
|
||||
public void enqueue(MockResponse response) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}:{}] Enqueueing response [{}], status [{}] body [{}]", getHostName(), getPort(), responses.size(),
|
||||
response.getStatusCode(), getStartOfBody(response));
|
||||
}
|
||||
responses.add(response);
|
||||
}
|
||||
|
||||
|
@ -203,15 +209,23 @@ public class MockWebServer implements Closeable {
|
|||
* @return The requests that have been made to this mock web server
|
||||
*/
|
||||
public List<MockRequest> requests() {
|
||||
return requests;
|
||||
return new ArrayList<>(requests);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the first request in the list of requests and returns it to the caller.
|
||||
* This can be used as a queue if you know the order of your requests deone.
|
||||
* This can be used as a queue if you are sure the order of your requests.
|
||||
*/
|
||||
public MockRequest takeRequest() {
|
||||
return requests.remove(0);
|
||||
return requests.poll();
|
||||
}
|
||||
|
||||
/**
|
||||
* A utility method to peek into the requests and find out if #MockWebServer.takeRequests will not throw an out of bound exception
|
||||
* @return true if more requests are available, false otherwise
|
||||
*/
|
||||
public boolean hasMoreRequests() {
|
||||
return requests.isEmpty() == false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -220,7 +234,7 @@ public class MockWebServer implements Closeable {
|
|||
*/
|
||||
@Override
|
||||
public void close() {
|
||||
logger.debug("Counting down all latches before terminating executor");
|
||||
logger.debug("[{}:{}] Counting down all latches before terminating executor", getHostName(), getPort());
|
||||
latches.forEach(CountDownLatch::countDown);
|
||||
|
||||
if (server.getExecutor() instanceof ExecutorService) {
|
||||
|
@ -231,4 +245,17 @@ public class MockWebServer implements Closeable {
|
|||
}
|
||||
server.stop(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to return the first 20 chars of a request's body
|
||||
* @param response The MockResponse to inspect
|
||||
* @return Returns the first 20 chars or an empty string if the response body is not configured
|
||||
*/
|
||||
private String getStartOfBody(MockResponse response) {
|
||||
if (Strings.isEmpty(response.getBody())) {
|
||||
return "";
|
||||
}
|
||||
int length = Math.min(20, response.getBody().length());
|
||||
return response.getBody().substring(0, length).replaceAll("\n", "");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ import static org.hamcrest.Matchers.containsString;
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
|
@ -229,10 +230,10 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
|
|||
// pretend that one of the templates is missing
|
||||
for (Tuple<String, String> template : monitoringTemplates()) {
|
||||
if (template.v1().contains(MonitoringBulkTimestampedResolver.Data.DATA)) {
|
||||
enqueueResponse(secondWebServer, 200, "template [" + template + "] exists");
|
||||
enqueueResponse(secondWebServer, 200, "template [" + template.v1() + "] exists");
|
||||
} else {
|
||||
enqueueResponse(secondWebServer, 404, "template [" + template + "] does not exist");
|
||||
enqueueResponse(secondWebServer, 201, "template [" + template + "] created");
|
||||
enqueueResponse(secondWebServer, 404, "template [" + template.v1() + "] does not exist");
|
||||
enqueueResponse(secondWebServer, 201, "template [" + template.v1() + "] created");
|
||||
}
|
||||
}
|
||||
// opposite of if it existed before
|
||||
|
@ -450,6 +451,8 @@ public class HttpExporterIT extends MonitoringIntegTestCase {
|
|||
@Nullable final Map<String, String[]> customHeaders, @Nullable final String basePath)
|
||||
throws Exception {
|
||||
final String pathPrefix = basePathToAssertablePrefix(basePath);
|
||||
// the bulk request is fired off asynchronously so we might need to take a while, until we can get the request from the webserver
|
||||
assertBusy(() -> assertThat("Waiting for further requests in web server", webServer.hasMoreRequests(), is(true)));
|
||||
final MockRequest request = webServer.takeRequest();
|
||||
|
||||
assertThat(request.getMethod(), equalTo("POST"));
|
||||
|
|
Loading…
Reference in New Issue