Improve Stability of Mock APIs (#49518) (#49524)

This commit ensures that even for requests that are known to be empty body
we at least attempt to read one bytes from the request body input stream.
This is done to work around the behavior in `sun.net.httpserver.ServerImpl.Dispatcher#handleEvent`
that will close a TCP/HTTP connection that does not have the `eof` flag (see `sun.net.httpserver.LeftOverInputStream#isEOF`)
set on its input stream. As far as I can tell the only way to set this flag is to do a read when there's no more bytes buffered.
This fixes the numerous connection closing issues because the `ServerImpl` stops closing connections that it thinks
weren't fully drained.

Also, I removed a now redundant drain loop in the Azure handler as well as removed the connection closing in the error handler's
drain action (this shouldn't have an effect but makes things more predictable/easier to reason about IMO).

I would suggest merging this and closing related issue after verifying that this fixes things on CI.

The way to locally reproduce the issues we're seeing in tests is to make the retry timings more aggressive in e.g. the azure tests
and move them to single digit values. This makes the retries happen quickly enough that they run into the async connecting closing
of allegedly non-eof connections by `ServerImpl` and produces the exact kinds of failures we're seeing currently.

Relates #49401, #49429
This commit is contained in:
Armin Braun 2019-11-25 10:28:55 +01:00 committed by GitHub
parent 5256756879
commit a5fa86ed97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 13 additions and 7 deletions

View File

@ -31,7 +31,6 @@ import org.elasticsearch.rest.RestUtils;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
@ -63,6 +62,10 @@ public class AzureHttpHandler implements HttpHandler {
@Override @Override
public void handle(final HttpExchange exchange) throws IOException { public void handle(final HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
if (request.startsWith("GET") || request.startsWith("HEAD") || request.startsWith("DELETE")) {
int read = exchange.getRequestBody().read();
assert read == -1 : "Request body should have been empty but saw [" + read + "]";
}
try { try {
if (Regex.simpleMatch("PUT /" + container + "/*blockid=*", request)) { if (Regex.simpleMatch("PUT /" + container + "/*blockid=*", request)) {
// Put Block (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block) // Put Block (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block)
@ -140,9 +143,6 @@ public class AzureHttpHandler implements HttpHandler {
} else if (Regex.simpleMatch("DELETE /" + container + "/*", request)) { } else if (Regex.simpleMatch("DELETE /" + container + "/*", request)) {
// Delete Blob (https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob) // Delete Blob (https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob)
try (InputStream is = exchange.getRequestBody()) {
while (is.read() >= 0);
}
blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath())); blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath()));
exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1); exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1);

View File

@ -76,6 +76,10 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
@Override @Override
public void handle(final HttpExchange exchange) throws IOException { public void handle(final HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
if (request.startsWith("GET") || request.startsWith("HEAD") || request.startsWith("DELETE")) {
int read = exchange.getRequestBody().read();
assert read == -1 : "Request body should have been empty but saw [" + read + "]";
}
try { try {
if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) { if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list // List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list

View File

@ -79,6 +79,10 @@ public class S3HttpHandler implements HttpHandler {
@Override @Override
public void handle(final HttpExchange exchange) throws IOException { public void handle(final HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
if (request.startsWith("GET") || request.startsWith("HEAD") || request.startsWith("DELETE")) {
int read = exchange.getRequestBody().read();
assert read == -1 : "Request body should have been empty but saw [" + read + "]";
}
try { try {
if (Regex.simpleMatch("POST /" + path + "/*?uploads", request)) { if (Regex.simpleMatch("POST /" + path + "/*?uploads", request)) {
final String uploadId = UUIDs.randomBase64UUID(); final String uploadId = UUIDs.randomBase64UUID();

View File

@ -162,9 +162,7 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
* Consumes and closes the given {@link InputStream} * Consumes and closes the given {@link InputStream}
*/ */
protected static void drainInputStream(final InputStream inputStream) throws IOException { protected static void drainInputStream(final InputStream inputStream) throws IOException {
try (InputStream is = inputStream) { while (inputStream.read(BUFFER) >= 0) ;
while (is.read(BUFFER) >= 0);
}
} }
/** /**