Check that S3 setting `buffer_size` is always lower than `chunk_size`

We can be better at checking `buffer_size` and `chunk_size` for S3 repositories.
For example, we know that:

* `buffer_size` should be more than `5mb`
* `chunk_size` should be no more than `5tb`
* `buffer_size` should be lower than `chunk_size`

Otherwise, setting `buffer_size` is useless.

For the record:

`chunk_size` is a Snapshot setting whatever the implementation is.
`buffer_size` is an S3 implementation setting.

Let say that you are snapshotting a 500mb file. If you set `chunk_size` to `200mb`, then Snapshot service will call S3 repository to snapshot 3 files with the following sizes:

* `200mb`
* `200mb`
* `100mb`

If you set `buffer_size` to `100mb` (AWS maximum size recommendation), the first file of `200mb` will be uploaded on S3 using the multipart feature in 2 chunks and the workflow is basically the following:

* create the multipart request and get back an `id` from AWS S3 platform
* upload part1: `100mb`
* upload part2: `100mb`
* "commit" the full upload using the `id`.

Closes #17244.
This commit is contained in:
David Pilato 2016-03-23 10:39:54 +01:00
parent f8e84f0bbb
commit e907b7c11e
6 changed files with 100 additions and 22 deletions

View File

@ -523,6 +523,28 @@ public class Setting<T> extends ToXContentToBytes {
return new Setting<>(key, defaultValue, (s) -> ByteSizeValue.parseBytesSizeValue(s, key), properties);
}
public static Setting<ByteSizeValue> byteSizeSetting(String key, ByteSizeValue value, ByteSizeValue minValue, ByteSizeValue maxValue,
Property... properties) {
return byteSizeSetting(key, (s) -> value.toString(), minValue, maxValue, properties);
}
public static Setting<ByteSizeValue> byteSizeSetting(String key, Function<Settings, String> defaultValue,
ByteSizeValue minValue, ByteSizeValue maxValue,
Property... properties) {
return new Setting<>(key, defaultValue, (s) -> parseByteSize(s, minValue, maxValue, key), properties);
}
public static ByteSizeValue parseByteSize(String s, ByteSizeValue minValue, ByteSizeValue maxValue, String key) {
ByteSizeValue value = ByteSizeValue.parseBytesSizeValue(s, key);
if (value.bytes() < minValue.bytes()) {
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be >= " + minValue);
}
if (value.bytes() > maxValue.bytes()) {
throw new IllegalArgumentException("Failed to parse value [" + s + "] for setting [" + key + "] must be =< " + maxValue);
}
return value;
}
public static Setting<TimeValue> positiveTimeSetting(String key, TimeValue defaultValue, Property... properties) {
return timeSetting(key, defaultValue, TimeValue.timeValueMillis(0), properties);
}

View File

@ -189,7 +189,7 @@ The following settings are supported:
Big files can be broken down into chunks during snapshotting if needed.
The chunk size can be specified in bytes or by using size value notation,
i.e. `1g`, `10m`, `5k`. Defaults to `100m`.
i.e. `1gb`, `10mb`, `5kb`. Defaults to `1gb`.
`compress`::
@ -210,7 +210,7 @@ The following settings are supported:
to split the chunk into several parts, each of `buffer_size` length, and
to upload each part in its own request. Note that setting a buffer
size lower than `5mb` is not allowed since it will prevents the use of the
Multipart API and may result in upload errors. Defaults to `5mb`.
Multipart API and may result in upload errors. Defaults to `100mb`.
`max_retries`::

View File

@ -36,7 +36,6 @@ import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.util.ArrayList;
@ -47,8 +46,6 @@ import java.util.Locale;
*/
public class S3BlobStore extends AbstractComponent implements BlobStore {
public static final ByteSizeValue MIN_BUFFER_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB);
private final AmazonS3 client;
private final String bucket;
@ -72,12 +69,7 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
this.bucket = bucket;
this.region = region;
this.serverSideEncryption = serverSideEncryption;
this.bufferSize = (bufferSize != null) ? bufferSize : MIN_BUFFER_SIZE;
if (this.bufferSize.getBytes() < MIN_BUFFER_SIZE.getBytes()) {
throw new BlobStoreException("Detected a buffer_size for the S3 storage lower than [" + MIN_BUFFER_SIZE + "]");
}
this.bufferSize = bufferSize;
this.cannedACL = initCannedACL(cannedACL);
this.numberOfRetries = maxRetries;
this.storageClass = initStorageClass(storageClass);

View File

@ -106,19 +106,21 @@ public class S3Repository extends BlobStoreRepository {
* repositories.s3.buffer_size: Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold,
* the S3 repository will use the AWS Multipart Upload API to split the chunk into several parts, each of buffer_size length, and
* to upload each part in its own request. Note that setting a buffer size lower than 5mb is not allowed since it will prevents the
* use of the Multipart API and may result in upload errors. Defaults to 5mb.
* use of the Multipart API and may result in upload errors. Defaults to 100m.
*/
Setting<ByteSizeValue> BUFFER_SIZE_SETTING =
Setting.byteSizeSetting("repositories.s3.buffer_size", S3BlobStore.MIN_BUFFER_SIZE, Property.NodeScope);
Setting.byteSizeSetting("repositories.s3.buffer_size", new ByteSizeValue(100, ByteSizeUnit.MB),
new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.TB), Property.NodeScope);
/**
* repositories.s3.max_retries: Number of retries in case of S3 errors. Defaults to 3.
*/
Setting<Integer> MAX_RETRIES_SETTING = Setting.intSetting("repositories.s3.max_retries", 3, Property.NodeScope);
/**
* repositories.s3.chunk_size: Big files can be broken down into chunks during snapshotting if needed. Defaults to 100m.
* repositories.s3.chunk_size: Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g.
*/
Setting<ByteSizeValue> CHUNK_SIZE_SETTING =
Setting.byteSizeSetting("repositories.s3.chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB), Property.NodeScope);
Setting.byteSizeSetting("repositories.s3.chunk_size", new ByteSizeValue(1, ByteSizeUnit.GB),
new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.TB), Property.NodeScope);
/**
* repositories.s3.compress: When set to true metadata files are stored in compressed format. This setting doesnt affect index
* files that are already compressed by default. Defaults to false.
@ -187,7 +189,8 @@ public class S3Repository extends BlobStoreRepository {
* @see Repositories#BUFFER_SIZE_SETTING
*/
Setting<ByteSizeValue> BUFFER_SIZE_SETTING =
Setting.byteSizeSetting("buffer_size", S3BlobStore.MIN_BUFFER_SIZE, Property.NodeScope);
Setting.byteSizeSetting("buffer_size", new ByteSizeValue(100, ByteSizeUnit.MB),
new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.TB), Property.NodeScope);
/**
* max_retries
* @see Repositories#MAX_RETRIES_SETTING
@ -197,7 +200,9 @@ public class S3Repository extends BlobStoreRepository {
* chunk_size
* @see Repositories#CHUNK_SIZE_SETTING
*/
Setting<ByteSizeValue> CHUNK_SIZE_SETTING = Setting.byteSizeSetting("chunk_size", "-1", Property.NodeScope);
Setting<ByteSizeValue> CHUNK_SIZE_SETTING =
Setting.byteSizeSetting("chunk_size", new ByteSizeValue(1, ByteSizeUnit.GB),
new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.TB), Property.NodeScope);
/**
* compress
* @see Repositories#COMPRESS_SETTING
@ -260,6 +265,12 @@ public class S3Repository extends BlobStoreRepository {
this.chunkSize = getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING);
this.compress = getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING);
// We make sure that chunkSize is bigger or equal than/to bufferSize
if (this.chunkSize.getMb() < bufferSize.getMb()) {
throw new RepositoryException(name.name(), Repository.CHUNK_SIZE_SETTING.getKey() + " (" + this.chunkSize +
") can't be lower than " + Repository.BUFFER_SIZE_SETTING.getKey() + " (" + bufferSize + ").");
}
// Parse and validate the user's S3 Storage Class setting
String storageClass = getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING);
String cannedACL = getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING);

View File

@ -21,12 +21,20 @@ package org.elasticsearch.cloud.aws;
import com.amazonaws.Protocol;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.s3.S3Repository;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.repositories.s3.S3Repository.Repositories;
import static org.elasticsearch.repositories.s3.S3Repository.Repository;
import static org.elasticsearch.repositories.s3.S3Repository.getValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isEmptyString;
@ -110,9 +118,9 @@ public class RepositoryS3SettingsTests extends ESTestCase {
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("global-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(5L));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(100L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(3));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(100L));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getGb(), is(1L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), isEmptyString());
@ -138,9 +146,9 @@ public class RepositoryS3SettingsTests extends ESTestCase {
assertThat(AwsS3Service.CLOUD_S3.SIGNER_SETTING.get(nodeSettings), is("s3-signer"));
assertThat(getValue(repositorySettings, Repository.SERVER_SIDE_ENCRYPTION_SETTING, Repositories.SERVER_SIDE_ENCRYPTION_SETTING),
is(false));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(5L));
assertThat(getValue(repositorySettings, Repository.BUFFER_SIZE_SETTING, Repositories.BUFFER_SIZE_SETTING).getMb(), is(100L));
assertThat(getValue(repositorySettings, Repository.MAX_RETRIES_SETTING, Repositories.MAX_RETRIES_SETTING), is(3));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getMb(), is(100L));
assertThat(getValue(repositorySettings, Repository.CHUNK_SIZE_SETTING, Repositories.CHUNK_SIZE_SETTING).getGb(), is(1L));
assertThat(getValue(repositorySettings, Repository.COMPRESS_SETTING, Repositories.COMPRESS_SETTING), is(false));
assertThat(getValue(repositorySettings, Repository.STORAGE_CLASS_SETTING, Repositories.STORAGE_CLASS_SETTING), isEmptyString());
assertThat(getValue(repositorySettings, Repository.CANNED_ACL_SETTING, Repositories.CANNED_ACL_SETTING), isEmptyString());
@ -292,6 +300,25 @@ public class RepositoryS3SettingsTests extends ESTestCase {
assertThat(getValue(repositorySettings, Repository.BASE_PATH_SETTING, Repositories.BASE_PATH_SETTING), is("repository-basepath"));
}
/**
* We test wrong Chunk and Buffer settings
*/
public void testInvalidChunkBufferSizeRepositorySettings() throws IOException {
// chunk < buffer should fail
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(10, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.MB),
"chunk_size (5mb) can't be lower than buffer_size (10mb).");
// chunk > buffer should pass
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.MB), null);
// chunk = buffer should pass
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(5, ByteSizeUnit.MB), null);
// buffer < 5mb should fail
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(4, ByteSizeUnit.MB), new ByteSizeValue(10, ByteSizeUnit.MB),
"Failed to parse value [4mb] for setting [buffer_size] must be >= 5mb");
// chunk > 5tb should fail
internalTestInvalidChunkBufferSizeSettings(new ByteSizeValue(5, ByteSizeUnit.MB), new ByteSizeValue(6, ByteSizeUnit.TB),
"Failed to parse value [6tb] for setting [chunk_size] must be =< 5tb");
}
private Settings buildSettings(Settings... global) {
Settings.Builder builder = Settings.builder();
for (Settings settings : global) {
@ -299,4 +326,28 @@ public class RepositoryS3SettingsTests extends ESTestCase {
}
return builder.build();
}
private void internalTestInvalidChunkBufferSizeSettings(ByteSizeValue buffer, ByteSizeValue chunk, String expectedMessage)
throws IOException {
Settings nodeSettings = buildSettings(AWS, S3, REPOSITORIES);
RepositorySettings s3RepositorySettings = new RepositorySettings(nodeSettings, Settings.builder()
.put(Repository.BUFFER_SIZE_SETTING.getKey(), buffer)
.put(Repository.CHUNK_SIZE_SETTING.getKey(), chunk)
.build());
try {
new S3Repository(new RepositoryName("s3", "s3repo"), s3RepositorySettings, null, null);
fail("We should either raise a NPE or a RepositoryException or a IllegalArgumentException");
} catch (RepositoryException e) {
assertThat(e.getDetailedMessage(), containsString(expectedMessage));
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString(expectedMessage));
} catch (NullPointerException e) {
// Because we passed to the CTOR a Null AwsS3Service, we get a NPE which is expected
// in the context of this test
if (expectedMessage != null) {
fail("We should have raised a RepositoryException");
}
}
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.cloud.aws.blobstore;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayOutputStream;
@ -33,7 +35,7 @@ import static org.hamcrest.Matchers.is;
* Unit test for {@link S3OutputStream}.
*/
public class S3OutputStreamTests extends ESTestCase {
private static final int BUFFER_SIZE = S3BlobStore.MIN_BUFFER_SIZE.bytesAsInt();
private static final int BUFFER_SIZE = new ByteSizeValue(5, ByteSizeUnit.MB).bytesAsInt();
public void testWriteLessDataThanBufferSize() throws IOException {
MockDefaultS3OutputStream out = newS3OutputStream(BUFFER_SIZE);