Merge branch 'fix/17244-s3-chunk-buffer-sizes'
This commit is contained in:
commit
6e80b5f2dd
|
@ -523,6 +523,28 @@ public class Setting<T> extends ToXContentToBytes {
|
|||
return new Setting<>(key, defaultValue, (s) -> ByteSizeValue.parseBytesSizeValue(s, key), properties);
|
||||
}
|
||||
|
||||
public static Setting<ByteSizeValue> byteSizeSetting(String key, ByteSizeValue value, ByteSizeValue minValue, ByteSizeValue maxValue,
|
||||
Property... properties) {
|
||||
return byteSizeSetting(key, (s) -> value.toString(), minValue, maxValue, properties);
|
||||
}
|
||||
|
||||
public static Setting<ByteSizeValue> byteSizeSetting(String key, Function<Settings, String> defaultValue,
|
||||
ByteSizeValue minValue, ByteSizeValue maxValue,
|
||||
Property... properties) {
|
||||
return new Setting<>(key, defaultValue, (s) -> parseByteSize(s, minValue, maxValue, key), properties);
|
||||
}
|
||||
|
||||
public static ByteSizeValue parseByteSize(String s, ByteSizeValue minValue, ByteSizeValue maxValue, String key) {
|
||||
ByteSizeValue value = ByteSizeValue.parseBytesSizeValue(s, key);
|
||||
if (value.bytes() < minValue.bytes()) {
|
||||
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
|
||||
}
|
||||
if (value.bytes() > maxValue.bytes()) {
|
||||
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be =< " + maxValue);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
public static Setting<TimeValue> positiveTimeSetting(String key, TimeValue defaultValue, Property... properties) {
|
||||
return timeSetting(key, defaultValue, TimeValue.timeValueMillis(0), properties);
|
||||
}
|
||||
|
|
|
@ -189,7 +189,7 @@ The following settings are supported:
|
|||
|
||||
Big files can be broken down into chunks during snapshotting if needed.
|
||||
The chunk size can be specified in bytes or by using size value notation,
|
||||
i.e. `1g`, `10m`, `5k`. Defaults to `100m`.
|
||||
i.e. `1gb`, `10mb`, `5kb`. Defaults to `1gb`.
|
||||
|
||||
`compress`::
|
||||
|
||||
|
@ -210,7 +210,7 @@ The following settings are supported:
|
|||
to split the chunk into several parts, each of `buffer_size` length, and
|
||||
to upload each part in its own request. Note that setting a buffer
|
||||
size lower than `5mb` is not allowed since it will prevents the use of the
|
||||
Multipart API and may result in upload errors. Defaults to `5mb`.
|
||||
Multipart API and may result in upload errors. Defaults to `100mb`.
|
||||
|
||||
`max_retries`::
|
||||
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.elasticsearch.common.blobstore.BlobStore;
|
|||
import org.elasticsearch.common.blobstore.BlobStoreException;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -47,8 +46,6 @@ import java.util.Locale;
|
|||
*/
|
||||
public class S3BlobStore extends AbstractComponent implements BlobStore {
|
||||
|
||||
public static final ByteSizeValue MIN_BUFFER_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB);
|
||||
|
||||
private final AmazonS3 client;
|
||||
|
||||
private final String bucket;
|
||||
|
@ -72,12 +69,7 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
|
|||
this.bucket = bucket;
|
||||
this.region = region;
|
||||
this.serverSideEncryption = serverSideEncryption;
|
||||
|
||||
this.bufferSize = (bufferSize != null) ? bufferSize : MIN_BUFFER_SIZE;
|
||||
if (this.bufferSize.getBytes() < MIN_BUFFER_SIZE.getBytes()) {
|
||||
throw new BlobStoreException("Detected a buffer_size for the S3 storage lower than [" + MIN_BUFFER_SIZE + "]");
|
||||
}
|
||||
|
||||
this.bufferSize = bufferSize;
|
||||
this.cannedACL = initCannedACL(cannedACL);
|
||||
this.numberOfRetries = maxRetries;
|
||||
this.storageClass = initStorageClass(storageClass);
|
||||
|
|
|
@ -106,19 +106,21 @@ public class S3Repository extends BlobStoreRepository {
|
|||
* repositories.s3.buffer_size: Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold,
|
||||
* the S3 repository will use the AWS Multipart Upload API to split the chunk into several parts, each of buffer_size length, and
|
||||
* to upload each part in its own request. Note that setting a buffer size lower than 5mb is not allowed since it will prevents the
|
||||
* use of the Multipart API and may result in upload errors. Defaults to 5mb.
|
||||
* use of the Multipart API and may result in upload errors. Defaults to 100m.
|
||||
*/
|
||||
Setting<ByteSizeValue> BUFFER_SIZE_SETTING =
|
||||
Setting.byteSizeSetting("repositories.s3.buffer_size", S3BlobStore.MIN_BUFFER_SIZE, Property.NodeScope);
|
||||
Setting.byteSizeSetting("repositories.s3.buffer_size", new ByteSizeValue(100, ByteSizeUnit.MB),
|
||||
new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.TB), Property.NodeScope);
|
||||
/**
|
||||
* repositories.s3.max_retries: Number of retries in case of S3 errors. Defaults to 3.
|
||||
*/
|
||||
Setting<Integer> MAX_RETRIES_SETTING = Setting.intSetting("repositories.s3.max_retries", 3, Property.NodeScope);
|
||||
/**
|
||||
* repositories.s3.chunk_size: Big files can be broken down into chunks during snapshotting if needed. Defaults to 100m.
|
||||
* repositories.s3.chunk_size: Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g.
|
||||
*/
|
||||
Setting<ByteSizeValue> CHUNK_SIZE_SETTING =
|
||||
Setting.byteSizeSetting("repositories.s3.chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB), Property.NodeScope);
|
||||
Setting.byteSizeSetting("repositories.s3.chunk_size", new ByteSizeValue(1, ByteSizeUnit.GB),
|
||||
new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.TB), Property.NodeScope);
|
||||
/**
|
||||
* repositories.s3.compress: When set to true metadata files are stored in compressed format. This setting doesn’t affect index
|
||||
* files that are already compressed by default. Defaults to false.
|
||||
|
@ -187,7 +189,8 @@ public class S3Repository extends BlobStoreRepository {
|
|||
* @see Repositories#BUFFER_SIZE_SETTING
|
||||
*/
|
||||
Setting<ByteSizeValue> BUFFER_SIZE_SETTING =
|
||||
Setting.byteSizeSetting("buffer_size", S3BlobStore.MIN_BUFFER_SIZE, Property.NodeScope);
|
||||
Setting.byteSizeSetting("buffer_size", new ByteSizeValue(100, ByteSizeUnit.MB),
|
||||
new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.TB), Property.NodeScope);
|
||||
/**
|
||||
* max_retries
|
||||
* @see Repositories#MAX_RETRIES_SETTING
|
||||
|
@ -197,7 +200,9 @@ public class S3Repository extends BlobStoreRepository {
|
|||
* chunk_size
|
||||
* @see Repositories#CHUNK_SIZE_SETTING
|
||||
*/
|
||||
Setting<ByteSizeValue> CHUNK_SIZE_SETTING = Setting.byteSizeSetting("chunk_size", "-1", Property.NodeScope);
|
||||
Setting<ByteSizeValue> CHUNK_SIZE_SETTING =
|
||||
Setting.byteSizeSetting("chunk_size", new ByteSizeValue(1, ByteSizeUnit.GB),
|
||||
new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.TB), Property.NodeScope);
|
||||
/**
|
||||
* compress
|
||||
* @see Repositories#COMPRESS_SETTING
|
||||
|
@ -260,6 +265,12 @@ public class S3Repository extends BlobStoreRepository {
|
|||
this.chunkSize = getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING);
|
||||
this.compress = getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING);
|
||||
|
||||
// We make sure that chunkSize is bigger or equal than/to bufferSize
|
||||
if (this.chunkSize.getBytes() < bufferSize.getBytes()) {
|
||||
throw new RepositoryException(name.name(), Repository.CHUNK_SIZE_SETTING.getKey() + " (" + this.chunkSize +
|
||||
") can't be lower than " + Repository.BUFFER_SIZE_SETTING.getKey() + " (" + bufferSize + ").");
|
||||
}
|
||||
|
||||
// Parse and validate the user's S3 Storage Class setting
|
||||
String storageClass = getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING);
|
||||
String cannedACL = getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING);
|
||||
|
|
|
@ -21,12 +21,20 @@ package org.elasticsearch.cloud.aws;
|
|||
|
||||
import com.amazonaws.Protocol;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.repositories.RepositoryException;
|
||||
import org.elasticsearch.repositories.RepositoryName;
|
||||
import org.elasticsearch.repositories.RepositorySettings;
|
||||
import org.elasticsearch.repositories.s3.S3Repository;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.repositories.s3.S3Repository.Repositories;
|
||||
import static org.elasticsearch.repositories.s3.S3Repository.Repository;
|
||||
import static org.elasticsearch.repositories.s3.S3Repository.getValue;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.isEmptyString;
|
||||
|
||||
|
@ -110,9 +118,9 @@ public class RepositoryS3SettingsTests extends ESTestCase {
|
|||
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
|
||||
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
|
||||
is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(5L));
|
||||
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(100L));
|
||||
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(3));
|
||||
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(100L));
|
||||
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getGb(), is(1L));
|
||||
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING), isEmptyString());
|
||||
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), isEmptyString());
|
||||
|
@ -138,9 +146,9 @@ public class RepositoryS3SettingsTests extends ESTestCase {
|
|||
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
|
||||
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
|
||||
is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(5L));
|
||||
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(100L));
|
||||
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(3));
|
||||
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(100L));
|
||||
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getGb(), is(1L));
|
||||
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
|
||||
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING), isEmptyString());
|
||||
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), isEmptyString());
|
||||
|
@ -292,6 +300,25 @@ public class RepositoryS3SettingsTests extends ESTestCase {
|
|||
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
|
||||
}
|
||||
|
||||
/**
|
||||
* We test wrong Chunk and Buffer settings
|
||||
*/
|
||||
public void testInvalidChunkBufferSizeRepositorySettings() throws IOException {
|
||||
// chunk < buffer should fail
|
||||
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.MB),
|
||||
"chunk_size (5mb) can't be lower than buffer_size (10mb).");
|
||||
// chunk > buffer should pass
|
||||
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.MB), null);
|
||||
// chunk = buffer should pass
|
||||
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.MB), null);
|
||||
// buffer < 5mb should fail
|
||||
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(4, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.MB),
|
||||
"Failed to parse value [4mb] for setting [buffer_size] must be >= 5mb");
|
||||
// chunk > 5tb should fail
|
||||
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(6, ByteSizeUnit.TB),
|
||||
"Failed to parse value [6tb] for setting [chunk_size] must be =< 5tb");
|
||||
}
|
||||
|
||||
private Settings buildSettings(Settings... global) {
|
||||
Settings.Builder builder = Settings.builder();
|
||||
for (Settings settings : global) {
|
||||
|
@ -299,4 +326,28 @@ public class RepositoryS3SettingsTests extends ESTestCase {
|
|||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private void internalTestInvalidChunkBufferSizeSettings(ByteSizeValue buffer, ByteSizeValue chunk, String expectedMessage)
|
||||
throws IOException {
|
||||
Settings nodeSettings = buildSettings(AWS, S3, REPOSITORIES);
|
||||
RepositorySettings s3RepositorySettings = new RepositorySettings(nodeSettings, Settings.builder()
|
||||
.put(Repository.BUFFER_SIZE_SETTING.getKey(), buffer)
|
||||
.put(Repository.CHUNK_SIZE_SETTING.getKey(), chunk)
|
||||
.build());
|
||||
|
||||
try {
|
||||
new S3Repository(new RepositoryName("s3", "s3repo"), s3RepositorySettings, null, null);
|
||||
fail("We should either raise a NPE or a RepositoryException or a IllegalArgumentException");
|
||||
} catch (RepositoryException e) {
|
||||
assertThat(e.getDetailedMessage(), containsString(expectedMessage));
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString(expectedMessage));
|
||||
} catch (NullPointerException e) {
|
||||
// Because we passed to the CTOR a Null AwsS3Service, we get a NPE which is expected
|
||||
// in the context of this test
|
||||
if (expectedMessage != null) {
|
||||
fail("We should have raised a RepositoryException");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.elasticsearch.cloud.aws.blobstore;
|
||||
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
@ -33,7 +35,7 @@ import static org.hamcrest.Matchers.is;
|
|||
* Unit test for {@link S3OutputStream}.
|
||||
*/
|
||||
public class S3OutputStreamTests extends ESTestCase {
|
||||
private static final int BUFFER_SIZE = S3BlobStore.MIN_BUFFER_SIZE.bytesAsInt();
|
||||
private static final int BUFFER_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB).bytesAsInt();
|
||||
|
||||
public void testWriteLessDataThanBufferSize() throws IOException {
|
||||
MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE);
|
||||
|
|
Loading…
Reference in New Issue