This test was still very GC heavy in Java 8 runs in particular which seems to slow down request processing to the point of timeouts in some runs. This PR completely removes the large number of O(MB) `byte[]` allocations that were happening in the mock http handler which cuts the allocation rate by about a factor of 5 in my local testing for the GC heavy `testSnapshotWithLargeSegmentFiles` run. Closes #51446 Closes #50754
This commit is contained in:
parent
90285ee907
commit
7914c1a734
|
@ -33,7 +33,6 @@ import org.elasticsearch.common.SuppressForbidden;
|
|||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
|
@ -244,10 +243,10 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
|
|||
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
|
||||
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
|
||||
if (countDown.countDown()) {
|
||||
Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(exchange.getRequestBody());
|
||||
Optional<Tuple<String, BytesReference>> content = parseMultipartRequestBody(exchange.getRequestBody());
|
||||
assertThat(content.isPresent(), is(true));
|
||||
assertThat(content.get().v1(), equalTo("write_blob_max_retries"));
|
||||
if (Objects.deepEquals(bytes, content.get().v2().array())) {
|
||||
if (Objects.deepEquals(bytes, BytesReference.toBytes(content.get().v2()))) {
|
||||
byte[] response = ("{\"bucket\":\"bucket\",\"name\":\"" + content.get().v1() + "\"}").getBytes(UTF_8);
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/json");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
||||
|
|
|
@ -23,12 +23,12 @@ import com.sun.net.httpserver.HttpHandler;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.CompositeBytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.network.InetAddresses;
|
||||
|
@ -37,7 +37,6 @@ import org.elasticsearch.rest.RestStatus;
|
|||
import org.elasticsearch.rest.RestUtils;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
|
@ -70,6 +69,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
|
||||
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class);
|
||||
|
||||
private static final Pattern RANGE_MATCHER = Pattern.compile("bytes=([0-9]*)-([0-9]*)");
|
||||
|
||||
private final ConcurrentMap<String, BytesReference> blobs;
|
||||
private final String bucket;
|
||||
|
||||
|
@ -137,15 +138,15 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
BytesReference blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
|
||||
if (blob != null) {
|
||||
final String range = exchange.getRequestHeaders().getFirst("Range");
|
||||
Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range);
|
||||
Matcher matcher = RANGE_MATCHER.matcher(range);
|
||||
if (matcher.find() == false) {
|
||||
throw new AssertionError("Range bytes header does not match expected format: " + range);
|
||||
}
|
||||
|
||||
byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? BytesReference.toBytes(blob) : new byte[0];
|
||||
BytesReference response = Integer.parseInt(matcher.group(1)) == 0 ? blob : BytesArray.EMPTY;
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
||||
exchange.getResponseBody().write(response);
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length());
|
||||
response.writeTo(exchange.getResponseBody());
|
||||
} else {
|
||||
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
|
||||
}
|
||||
|
@ -173,7 +174,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
|
||||
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) {
|
||||
// Multipart upload
|
||||
Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(wrappedRequest);
|
||||
Optional<Tuple<String, BytesReference>> content = parseMultipartRequestBody(wrappedRequest);
|
||||
if (content.isPresent()) {
|
||||
blobs.put(content.get().v1(), content.get().v2());
|
||||
|
||||
|
@ -212,35 +213,21 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
|
||||
return;
|
||||
}
|
||||
byte[] blob = BytesReference.toBytes(blobs.get(blobName));
|
||||
BytesReference blob = blobs.get(blobName);
|
||||
final String range = exchange.getRequestHeaders().getFirst("Content-Range");
|
||||
final Integer limit = getContentRangeLimit(range);
|
||||
final int start = getContentRangeStart(range);
|
||||
final int end = getContentRangeEnd(range);
|
||||
|
||||
final ByteArrayOutputStream out = new ByteArrayOutputStream() {
|
||||
@Override
|
||||
public byte[] toByteArray() {
|
||||
return buf;
|
||||
}
|
||||
};
|
||||
long bytesRead = Streams.copy(wrappedRequest, out, new byte[128]);
|
||||
int length = Math.max(end + 1, limit != null ? limit : 0);
|
||||
if ((int) bytesRead > length) {
|
||||
throw new AssertionError("Requesting more bytes than available for blob");
|
||||
}
|
||||
if (length > blob.length) {
|
||||
blob = ArrayUtil.growExact(blob, length);
|
||||
}
|
||||
System.arraycopy(out.toByteArray(), 0, blob, start, Math.toIntExact(bytesRead));
|
||||
blobs.put(blobName, new BytesArray(blob));
|
||||
blob = new CompositeBytesReference(blob, Streams.readFully(wrappedRequest));
|
||||
blobs.put(blobName, blob);
|
||||
|
||||
if (limit == null) {
|
||||
exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", start, end));
|
||||
exchange.getResponseHeaders().add("Content-Length", "0");
|
||||
exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
|
||||
} else {
|
||||
if (limit > blob.length) {
|
||||
if (limit > blob.length()) {
|
||||
throw new AssertionError("Requesting more bytes than available for blob");
|
||||
}
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
|
||||
|
@ -266,8 +253,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
|
||||
private static final Pattern NAME_PATTERN = Pattern.compile("\"name\":\"([^\"]*)\"");
|
||||
|
||||
public static Optional<Tuple<String, BytesArray>> parseMultipartRequestBody(final InputStream requestBody) throws IOException {
|
||||
Tuple<String, BytesArray> content = null;
|
||||
public static Optional<Tuple<String, BytesReference>> parseMultipartRequestBody(final InputStream requestBody) throws IOException {
|
||||
Tuple<String, BytesReference> content = null;
|
||||
final BytesReference fullRequestBody;
|
||||
try (InputStream in = new GZIPInputStream(requestBody)) {
|
||||
fullRequestBody = Streams.readFully(in);
|
||||
|
@ -308,10 +295,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
} else {
|
||||
// removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long
|
||||
int len = fullRequestBody.length() - startPos - 23;
|
||||
final InputStream stream = fullRequestBody.slice(startPos, len).streamInput();
|
||||
final byte[] buffer = new byte[len];
|
||||
Streams.readFully(stream, buffer);
|
||||
content = Tuple.tuple(name, new BytesArray(buffer));
|
||||
content = Tuple.tuple(name, fullRequestBody.slice(startPos, len));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue