Improve Stability of GCS Mock API (#49592) (#49597)

Same as #49518 pretty much but for GCS.
Fixing a few more spots where input stream can get closed
without being fully drained and adding assertions to make sure
it's always drained.
Moved the no-close stream wrapper to production code utilities since
there's a number of spots in production code where it's also useful
(will reuse it there in a follow-up).
This commit is contained in:
Armin Braun 2019-11-26 16:53:51 +01:00 committed by GitHub
parent 26a8ca00db
commit 495b543e63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 19 deletions

View File

@ -213,7 +213,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
if ("/token".equals(exchange.getRequestURI().getPath())) {
try {
// token content is unique per node (not per request)
return Streams.readFully(exchange.getRequestBody()).utf8ToString();
return Streams.readFully(Streams.noCloseStream(exchange.getRequestBody())).utf8ToString();
} catch (IOException e) {
throw new AssertionError("Unable to read token request body", e);
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.BufferedReader;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@ -219,6 +220,21 @@ public abstract class Streams {
}
}
/**
* Wraps an {@link InputStream} such that it's {@code close} method becomes a noop
*
* @param stream {@code InputStream} to wrap
* @return wrapped {@code InputStream}
*/
public static InputStream noCloseStream(InputStream stream) {
return new FilterInputStream(stream) {
@Override
public void close() {
// noop
}
};
}
/**
* Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when
* close is called.

View File

@ -30,12 +30,20 @@ import static java.nio.charset.StandardCharsets.UTF_8;
@SuppressForbidden(reason = "Uses a HttpServer to emulate a fake OAuth2 authentication service")
public class FakeOAuth2HttpHandler implements HttpHandler {
private static final byte[] BUFFER = new byte[1024];
@Override
public void handle(final HttpExchange exchange) throws IOException {
byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
exchange.close();
try {
byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
while (exchange.getRequestBody().read(BUFFER) >= 0) ;
} finally {
int read = exchange.getRequestBody().read();
assert read == -1 : "Request body should have been fully read here but saw [" + read + "]";
exchange.close();
}
}
}

View File

@ -81,6 +81,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
assert read == -1 : "Request body should have been empty but saw [" + read + "]";
}
try {
// Request body is closed in the finally block
final InputStream wrappedRequest = Streams.noCloseStream(exchange.getRequestBody());
if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
final Map<String, String> params = new HashMap<>();
@ -159,7 +161,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
// Batch https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch
final String uri = "/storage/v1/b/" + bucket + "/o/";
final StringBuilder batch = new StringBuilder();
for (String line : Streams.readAllLines(new BufferedInputStream(exchange.getRequestBody()))) {
for (String line : Streams.readAllLines(new BufferedInputStream(wrappedRequest))) {
if (line.length() == 0 || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) {
batch.append(line).append('\n');
} else if (line.startsWith("DELETE")) {
@ -180,7 +182,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) {
// Multipart upload
Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(exchange.getRequestBody());
Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(wrappedRequest);
if (content.isPresent()) {
blobs.put(content.get().v1(), content.get().v2());
@ -199,7 +201,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
final String blobName = params.get("name");
blobs.put(blobName, BytesArray.EMPTY);
byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8);
byte[] response = Streams.readFully(wrappedRequest).utf8ToString().getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.getResponseHeaders().add("Location", httpServerUrl(exchange) + "/upload/storage/v1/b/" + bucket + "/o?"
+ "uploadType=resumable"
@ -225,7 +227,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
final int end = getContentRangeEnd(range);
final ByteArrayOutputStream out = new ByteArrayOutputStream();
long bytesRead = Streams.copy(exchange.getRequestBody(), out);
long bytesRead = Streams.copy(wrappedRequest, out);
int length = Math.max(end + 1, limit != null ? limit : 0);
if ((int) bytesRead > length) {
throw new AssertionError("Requesting more bytes than available for blob");
@ -250,6 +252,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
}
} finally {
int read = exchange.getRequestBody().read();
assert read == -1 : "Request body should have been fully read here but saw [" + read + "]";
exchange.close();
}
}

View File

@ -190,16 +190,26 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
@Override
public void handle(final HttpExchange exchange) throws IOException {
final String requestId = requestUniqueId(exchange);
assert Strings.hasText(requestId);
try {
final String requestId = requestUniqueId(exchange);
assert Strings.hasText(requestId);
final boolean canFailRequest = canFailRequest(exchange);
final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
if (count >= maxErrorsPerRequest || canFailRequest == false) {
requests.remove(requestId);
delegate.handle(exchange);
} else {
handleAsError(exchange);
final boolean canFailRequest = canFailRequest(exchange);
final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
if (count >= maxErrorsPerRequest || canFailRequest == false) {
requests.remove(requestId);
delegate.handle(exchange);
} else {
handleAsError(exchange);
}
} finally {
try {
int read = exchange.getRequestBody().read();
assert read == -1 : "Request body should have been fully read here but saw [" + read + "]";
} catch (IOException e) {
// ignored, stream is assumed to have been closed by previous handler
}
exchange.close();
}
}