Optimize GCS Repo Uploads (#51596) (#51618)

For small uploads (that can still be up to 5MB!) we needlessly
reading the `InputStream` into a BAOS which entailed allocating
the `byte[]` for the stream contents twice (because to `toByteArray` on the BAOS copies).

Also, for resumeable uploads we were needlessly wrapping the output channel and running each individual write in its own privileged context when we could just wrap the whole upload in a single privileged context.

Relates #51593
This commit is contained in:
Armin Braun 2020-01-29 16:07:30 +01:00 committed by GitHub
parent 078e13b1fd
commit 74e3694234
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -22,7 +22,6 @@ package org.elasticsearch.repositories.gcs;
import com.google.api.gax.paging.Page;
import com.google.cloud.BatchResult;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
@ -46,13 +45,11 @@ import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.core.internal.io.Streams;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
@ -245,25 +242,8 @@ class GoogleCloudStorageBlobStore implements BlobStore {
new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0];
for (int retry = 0; retry < 3; ++retry) {
try {
final WriteChannel writeChannel = SocketAccess
.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
return writeChannel.isOpen();
}
@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
}
@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
}
}));
SocketAccess.doPrivilegedVoidIOException(() ->
Streams.copy(inputStream, Channels.newOutputStream(client().writer(blobInfo, writeOptions))));
return;
} catch (final StorageException se) {
final int errorCode = se.getCode();
@ -298,14 +278,14 @@ class GoogleCloudStorageBlobStore implements BlobStore {
private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
assert blobSize <= getLargeBlobThresholdInBytes() : "large blob uploads should use the resumable upload method";
final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
Streams.copy(inputStream, baos);
final byte[] buffer = new byte[Math.toIntExact(blobSize)];
org.elasticsearch.common.io.Streams.readFully(inputStream, buffer);
try {
final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ?
new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } :
new Storage.BlobTargetOption[0];
SocketAccess.doPrivilegedVoidIOException(
() -> client().create(blobInfo, baos.toByteArray(), targetOptions));
() -> client().create(blobInfo, buffer, targetOptions));
} catch (final StorageException se) {
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());