From d0d48311f42e741d4e20d81456b68167d61ae425 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 7 Jan 2020 20:17:46 +0100 Subject: [PATCH] Faster and Simpler GCS REST Mock (#50706) (#50707) * Faster and Simpler GCS REST Mock I reworked the GCS mock a little to use less copying+allocation, log the full request body on failure to read a multi-part request and generally be a little simpler and easy to follow to track down the remaining issues that are causing almost daily failures from this class's multi-part request parsing that can't be reproduced locally. --- .../gcs/GoogleCloudStorageHttpHandler.java | 109 +++++++++--------- 1 file changed, 55 insertions(+), 54 deletions(-) diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index 2aadf7a1c88..73f975d6589 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -20,6 +20,9 @@ package fixture.gcs; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; @@ -33,14 +36,14 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; -import java.io.BufferedInputStream; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.URLDecoder; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -54,6 +57,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.function.BiFunction; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import static java.nio.charset.StandardCharsets.UTF_8; @@ -64,6 +68,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; @SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint") public class GoogleCloudStorageHttpHandler implements HttpHandler { + private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class); + private final ConcurrentMap blobs; private final String bucket; @@ -262,63 +268,58 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { public static Optional> parseMultipartRequestBody(final InputStream requestBody) throws IOException { Tuple content = null; - try (BufferedInputStream in = new BufferedInputStream(new GZIPInputStream(requestBody))) { - String name = null; - int read; - ByteArrayOutputStream out = new ByteArrayOutputStream() { - @Override - public byte[] toByteArray() { - return buf; - } - }; - boolean skippedEmptyLine = false; - while ((read = in.read()) != -1) { - out.reset(); - boolean markAndContinue = false; - do { // search next consecutive {carriage return, new line} chars and stop - if ((char) read == '\r') { - int next = in.read(); - if (next != -1) { - if (next == '\n') { - break; - } - out.write(read); - out.write(next); - continue; + final BytesReference fullRequestBody; + try (InputStream in = new GZIPInputStream(requestBody)) { + fullRequestBody = Streams.readFully(in); + } + String name = null; + boolean skippedEmptyLine = false; + int startPos = 0; + int endPos = 0; + while (startPos < fullRequestBody.length()) { + do { + endPos = fullRequestBody.indexOf((byte) '\r', endPos + 1); + } while (endPos >= 0 && fullRequestBody.get(endPos + 1) != '\n'); + boolean markAndContinue = false; + final String bucketPrefix = "{\"bucket\":"; + if (startPos > 0) { + startPos += 2; + } + if (name == null || skippedEmptyLine == false) { + if ((skippedEmptyLine == false && endPos == startPos) + || (fullRequestBody.get(startPos) == '-' && fullRequestBody.get(startPos + 1) == '-')) { + markAndContinue = true; + } else { + final String start = fullRequestBody.slice(startPos, Math.min(endPos - startPos, bucketPrefix.length())).utf8ToString(); + if (start.toLowerCase(Locale.ROOT).startsWith("content")) { + markAndContinue = true; + } else if (start.startsWith(bucketPrefix)) { + markAndContinue = true; + final String line = fullRequestBody.slice( + startPos + bucketPrefix.length(), endPos - startPos - bucketPrefix.length()).utf8ToString(); + Matcher matcher = NAME_PATTERN.matcher(line); + if (matcher.find()) { + name = matcher.group(1); } } - out.write(read); - } while ((read = in.read()) != -1); - final String bucketPrefix = "{\"bucket\":"; - final String start = new String(out.toByteArray(), 0, Math.min(out.size(), bucketPrefix.length()), UTF_8); - if ((skippedEmptyLine == false && start.length() == 0) || start.startsWith("--") - || start.toLowerCase(Locale.ROOT).startsWith("content")) { - markAndContinue = true; - } else if (start.startsWith(bucketPrefix)) { - markAndContinue = true; - final String line = new String(out.toByteArray(), bucketPrefix.length(), out.size() - bucketPrefix.length(), UTF_8); - Matcher matcher = NAME_PATTERN.matcher(line); - if (matcher.find()) { - name = matcher.group(1); - } - } - if (markAndContinue) { - skippedEmptyLine = start.length() == 0; - in.mark(Integer.MAX_VALUE); - continue; - } - if (name != null) { - in.reset(); - out.reset(); - while ((read = in.read()) != -1) { - out.write(read); - } - // removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long - content = Tuple.tuple(name, new BytesArray(Arrays.copyOf(out.toByteArray(), out.size() - 23))); - break; } + skippedEmptyLine = markAndContinue && endPos == startPos; + startPos = endPos; + } else { + // removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long + int len = fullRequestBody.length() - startPos - 23; + final InputStream stream = fullRequestBody.slice(startPos, len).streamInput(); + final byte[] buffer = new byte[len]; + Streams.readFully(stream, buffer); + content = Tuple.tuple(name, new BytesArray(buffer)); + break; } } + if (content == null) { + final InputStream stream = fullRequestBody.streamInput(); + logger.warn(() -> new ParameterizedMessage("Failed to find multi-part upload in [{}]", new BufferedReader( + new InputStreamReader(stream)).lines().collect(Collectors.joining("\n")))); + } return Optional.ofNullable(content); }