Faster and Simpler GCS REST Mock (#50706) (#50707)

* Faster and Simpler GCS REST Mock

I reworked the GCS mock a little to use less copying+allocation,
log the full request body on failure to read a multi-part request
and generally be a little simpler and easy to follow to track down
the remaining issues that are causing almost daily failures from this
class's multi-part request parsing that can't be reproduced locally.
This commit is contained in:
Armin Braun 2020-01-07 20:17:46 +01:00 committed by GitHub
parent de6b62f789
commit d0d48311f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 55 additions and 54 deletions

View File

@ -20,6 +20,9 @@ package fixture.gcs;
import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
@ -33,14 +36,14 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.RestUtils;
import java.io.BufferedInputStream; import java.io.BufferedReader;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -54,6 +57,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
@ -64,6 +68,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint") @SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
public class GoogleCloudStorageHttpHandler implements HttpHandler { public class GoogleCloudStorageHttpHandler implements HttpHandler {
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class);
private final ConcurrentMap<String, BytesReference> blobs; private final ConcurrentMap<String, BytesReference> blobs;
private final String bucket; private final String bucket;
@ -262,62 +268,57 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
public static Optional<Tuple<String, BytesArray>> parseMultipartRequestBody(final InputStream requestBody) throws IOException { public static Optional<Tuple<String, BytesArray>> parseMultipartRequestBody(final InputStream requestBody) throws IOException {
Tuple<String, BytesArray> content = null; Tuple<String, BytesArray> content = null;
try (BufferedInputStream in = new BufferedInputStream(new GZIPInputStream(requestBody))) { final BytesReference fullRequestBody;
try (InputStream in = new GZIPInputStream(requestBody)) {
fullRequestBody = Streams.readFully(in);
}
String name = null; String name = null;
int read;
ByteArrayOutputStream out = new ByteArrayOutputStream() {
@Override
public byte[] toByteArray() {
return buf;
}
};
boolean skippedEmptyLine = false; boolean skippedEmptyLine = false;
while ((read = in.read()) != -1) { int startPos = 0;
out.reset(); int endPos = 0;
while (startPos < fullRequestBody.length()) {
do {
endPos = fullRequestBody.indexOf((byte) '\r', endPos + 1);
} while (endPos >= 0 && fullRequestBody.get(endPos + 1) != '\n');
boolean markAndContinue = false; boolean markAndContinue = false;
do { // search next consecutive {carriage return, new line} chars and stop
if ((char) read == '\r') {
int next = in.read();
if (next != -1) {
if (next == '\n') {
break;
}
out.write(read);
out.write(next);
continue;
}
}
out.write(read);
} while ((read = in.read()) != -1);
final String bucketPrefix = "{\"bucket\":"; final String bucketPrefix = "{\"bucket\":";
final String start = new String(out.toByteArray(), 0, Math.min(out.size(), bucketPrefix.length()), UTF_8); if (startPos > 0) {
if ((skippedEmptyLine == false && start.length() == 0) || start.startsWith("--") startPos += 2;
|| start.toLowerCase(Locale.ROOT).startsWith("content")) { }
if (name == null || skippedEmptyLine == false) {
if ((skippedEmptyLine == false && endPos == startPos)
|| (fullRequestBody.get(startPos) == '-' && fullRequestBody.get(startPos + 1) == '-')) {
markAndContinue = true;
} else {
final String start = fullRequestBody.slice(startPos, Math.min(endPos - startPos, bucketPrefix.length())).utf8ToString();
if (start.toLowerCase(Locale.ROOT).startsWith("content")) {
markAndContinue = true; markAndContinue = true;
} else if (start.startsWith(bucketPrefix)) { } else if (start.startsWith(bucketPrefix)) {
markAndContinue = true; markAndContinue = true;
final String line = new String(out.toByteArray(), bucketPrefix.length(), out.size() - bucketPrefix.length(), UTF_8); final String line = fullRequestBody.slice(
startPos + bucketPrefix.length(), endPos - startPos - bucketPrefix.length()).utf8ToString();
Matcher matcher = NAME_PATTERN.matcher(line); Matcher matcher = NAME_PATTERN.matcher(line);
if (matcher.find()) { if (matcher.find()) {
name = matcher.group(1); name = matcher.group(1);
} }
} }
if (markAndContinue) {
skippedEmptyLine = start.length() == 0;
in.mark(Integer.MAX_VALUE);
continue;
}
if (name != null) {
in.reset();
out.reset();
while ((read = in.read()) != -1) {
out.write(read);
} }
skippedEmptyLine = markAndContinue && endPos == startPos;
startPos = endPos;
} else {
// removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long // removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long
content = Tuple.tuple(name, new BytesArray(Arrays.copyOf(out.toByteArray(), out.size() - 23))); int len = fullRequestBody.length() - startPos - 23;
final InputStream stream = fullRequestBody.slice(startPos, len).streamInput();
final byte[] buffer = new byte[len];
Streams.readFully(stream, buffer);
content = Tuple.tuple(name, new BytesArray(buffer));
break; break;
} }
} }
if (content == null) {
final InputStream stream = fullRequestBody.streamInput();
logger.warn(() -> new ParameterizedMessage("Failed to find multi-part upload in [{}]", new BufferedReader(
new InputStreamReader(stream)).lines().collect(Collectors.joining("\n"))));
} }
return Optional.ofNullable(content); return Optional.ofNullable(content);
} }