Fix security manager bug writing large blobs to GCS (#55421)
* Fix security manager bug writing large blobs to GCS This commit addresses a security manager permissions issue writing large blobs (on the resumable upload path) to GCS. The underlying issue here is that we need to wrap the close and write calls on the channel. It is not enough to do this: SocketAccess.doPrivilegedVoidIOException( () -> Streams.copy( inputStream, Channels.newOutputStream(client().writer(blobInfo, writeOptions)))); This reason that this is not enough is because Streams#copy will be in the stacktrace and it is not granted the security manager permissions needed to close or write this channel. We only grant those permissions to classes loaded in the plugin classloader, and Streams#copy is from the parent classloader. This is why we must wrap the close and write calls as privileged, to truncate the Streams#copy call out of the stacktrace. The reason that this issue is not caught in testing is because the size of data that we use in testing is too small to trigger the large blob resumable upload path. Therefore, we address this by adding a system property to control the threshold, which we can then set in tests to exercise this code path. Prior to rewriting the writeBlobResumable method to wrap the close and write calls as privileged, with this additional test, we are able to reproduce the security manager permissions issue. After adding the wrapping, this test now passes. * Fix forbidden APIs issue * Remove leftover debugging
This commit is contained in:
parent
f46b567563
commit
0a1b566c65
|
@ -20,6 +20,7 @@
|
|||
|
||||
import org.elasticsearch.gradle.MavenFilteringHack
|
||||
import org.elasticsearch.gradle.info.BuildParams
|
||||
import org.elasticsearch.gradle.test.RestIntegTestTask
|
||||
|
||||
import java.nio.file.Files
|
||||
import java.security.KeyPair
|
||||
|
@ -117,9 +118,21 @@ task gcsThirdPartyTests {
|
|||
dependsOn check
|
||||
}
|
||||
|
||||
integTest.mustRunAfter(thirdPartyTest)
|
||||
check.dependsOn thirdPartyTest
|
||||
|
||||
integTest.mustRunAfter(thirdPartyTest)
|
||||
|
||||
/*
|
||||
* We only use a small amount of data in these tests, which means that the resumable upload path is not tested. We add
|
||||
* an additional test that forces the large blob threshold to be small to exercise the resumable upload path.
|
||||
*/
|
||||
task largeBlobIntegTest(type: RestIntegTestTask) {
|
||||
mustRunAfter(thirdPartyTest)
|
||||
}
|
||||
|
||||
check.dependsOn integTest
|
||||
check.dependsOn largeBlobIntegTest
|
||||
|
||||
Map<String, Object> expansions = [
|
||||
'bucket': gcsBucket,
|
||||
'base_path': gcsBasePath + "_integration_tests"
|
||||
|
@ -130,11 +143,15 @@ processTestResources {
|
|||
MavenFilteringHack.filter(it, expansions)
|
||||
}
|
||||
|
||||
integTest {
|
||||
final Closure integTestConfiguration = {
|
||||
dependsOn project(':plugins:repository-gcs').bundlePlugin
|
||||
}
|
||||
|
||||
testClusters.integTest {
|
||||
integTest integTestConfiguration
|
||||
|
||||
largeBlobIntegTest integTestConfiguration
|
||||
|
||||
final Closure testClustersConfiguration = {
|
||||
plugin project(':plugins:repository-gcs').bundlePlugin.archiveFile
|
||||
|
||||
keystore 'gcs.client.integration_test.credentials_file', serviceAccountFile, IGNORE_VALUE
|
||||
|
@ -148,3 +165,11 @@ testClusters.integTest {
|
|||
println "Using an external service to test the repository-gcs plugin"
|
||||
}
|
||||
}
|
||||
|
||||
testClusters.integTest testClustersConfiguration
|
||||
|
||||
testClusters.largeBlobIntegTest testClustersConfiguration
|
||||
testClusters.largeBlobIntegTest {
|
||||
// force large blob uploads by setting the threshold small, forcing this code path to be tested
|
||||
systemProperty 'es.repository_gcs.large_blob_threshold_byte_size', '256'
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.repositories.gcs;
|
|||
|
||||
import com.google.api.gax.paging.Page;
|
||||
import com.google.cloud.BatchResult;
|
||||
import com.google.cloud.WriteChannel;
|
||||
import com.google.cloud.storage.Blob;
|
||||
import com.google.cloud.storage.BlobId;
|
||||
import com.google.cloud.storage.BlobInfo;
|
||||
|
@ -33,6 +34,7 @@ import org.apache.logging.log4j.LogManager;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetadata;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
|
@ -41,11 +43,15 @@ import org.elasticsearch.common.blobstore.BlobStoreException;
|
|||
import org.elasticsearch.common.blobstore.DeleteResult;
|
||||
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.core.internal.io.Streams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -68,7 +74,26 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
|||
// request. Larger files should be uploaded over multiple requests (this is
|
||||
// called "resumable upload")
|
||||
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
|
||||
public static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
|
||||
public static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE;
|
||||
|
||||
static {
|
||||
final String key = "es.repository_gcs.large_blob_threshold_byte_size";
|
||||
final String largeBlobThresholdByteSizeProperty = System.getProperty(key);
|
||||
if (largeBlobThresholdByteSizeProperty == null) {
|
||||
LARGE_BLOB_THRESHOLD_BYTE_SIZE = Math.toIntExact(new ByteSizeValue(5, ByteSizeUnit.MB).getBytes());
|
||||
} else {
|
||||
final int largeBlobThresholdByteSize;
|
||||
try {
|
||||
largeBlobThresholdByteSize = Integer.parseInt(largeBlobThresholdByteSizeProperty);
|
||||
} catch (final NumberFormatException e) {
|
||||
throw new IllegalArgumentException("failed to parse " + key + " having value [" + largeBlobThresholdByteSizeProperty + "]");
|
||||
}
|
||||
if (largeBlobThresholdByteSize <= 0) {
|
||||
throw new IllegalArgumentException(key + " must be positive but was [" + largeBlobThresholdByteSizeProperty + "]");
|
||||
}
|
||||
LARGE_BLOB_THRESHOLD_BYTE_SIZE = largeBlobThresholdByteSize;
|
||||
}
|
||||
}
|
||||
|
||||
private final String bucketName;
|
||||
private final String clientName;
|
||||
|
@ -212,8 +237,30 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
|||
new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0];
|
||||
for (int retry = 0; retry < 3; ++retry) {
|
||||
try {
|
||||
SocketAccess.doPrivilegedVoidIOException(() ->
|
||||
Streams.copy(inputStream, Channels.newOutputStream(client().writer(blobInfo, writeOptions))));
|
||||
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
|
||||
/*
|
||||
* It is not enough to wrap the call to Streams#copy, we have to wrap the privileged calls too; this is because Streams#copy
|
||||
* is in the stacktrace and is not granted the permissions needed to close and write the channel.
|
||||
*/
|
||||
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
|
||||
|
||||
@SuppressForbidden(reason = "channel is based on a socket")
|
||||
@Override
|
||||
public int write(final ByteBuffer src) throws IOException {
|
||||
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return writeChannel.isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
|
||||
}
|
||||
|
||||
}));
|
||||
return;
|
||||
} catch (final StorageException se) {
|
||||
final int errorCode = se.getCode();
|
||||
|
|
Loading…
Reference in New Issue