Blob store compression fix (#39073)
Blob store compression was not enabled for some of the files in snapshots due to constructor accessing sub-class fields. Fixed to instead accept compress field as constructor param. Also fixed chunk size validation to work. Deprecated repositories.fs.compress setting as well to be able to unify in a future commit.
This commit is contained in:
parent
4aa50ed348
commit
00a26b9dd2
|
@ -83,7 +83,7 @@ public class URLRepository extends BlobStoreRepository {
|
|||
*/
|
||||
public URLRepository(RepositoryMetaData metadata, Environment environment,
|
||||
NamedXContentRegistry namedXContentRegistry) {
|
||||
super(metadata, environment.settings(), namedXContentRegistry);
|
||||
super(metadata, environment.settings(), false, namedXContentRegistry);
|
||||
|
||||
if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
|
||||
throw new RepositoryException(metadata.name(), "missing url");
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.repositories.azure;
|
|||
|
||||
import com.microsoft.azure.storage.LocationMode;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
|
@ -82,16 +81,14 @@ public class AzureRepository extends BlobStoreRepository {
|
|||
|
||||
private final BlobPath basePath;
|
||||
private final ByteSizeValue chunkSize;
|
||||
private final boolean compress;
|
||||
private final Environment environment;
|
||||
private final AzureStorageService storageService;
|
||||
private final boolean readonly;
|
||||
|
||||
public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
|
||||
AzureStorageService storageService) {
|
||||
super(metadata, environment.settings(), namedXContentRegistry);
|
||||
super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry);
|
||||
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
|
||||
this.compress = Repository.COMPRESS_SETTING.get(metadata.settings());
|
||||
this.environment = environment;
|
||||
this.storageService = storageService;
|
||||
|
||||
|
@ -132,7 +129,7 @@ public class AzureRepository extends BlobStoreRepository {
|
|||
|
||||
logger.debug((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
|
||||
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
|
||||
blobStore, chunkSize, compress, basePath));
|
||||
blobStore, chunkSize, isCompress(), basePath));
|
||||
return blobStore;
|
||||
}
|
||||
|
||||
|
@ -141,14 +138,6 @@ public class AzureRepository extends BlobStoreRepository {
|
|||
return basePath;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
protected boolean isCompress() {
|
||||
return compress;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
@ -62,7 +62,6 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
|
|||
private final Settings settings;
|
||||
private final GoogleCloudStorageService storageService;
|
||||
private final BlobPath basePath;
|
||||
private final boolean compress;
|
||||
private final ByteSizeValue chunkSize;
|
||||
private final String bucket;
|
||||
private final String clientName;
|
||||
|
@ -70,7 +69,7 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
|
|||
GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment,
|
||||
NamedXContentRegistry namedXContentRegistry,
|
||||
GoogleCloudStorageService storageService) {
|
||||
super(metadata, environment.settings(), namedXContentRegistry);
|
||||
super(metadata, environment.settings(), getSetting(COMPRESS, metadata), namedXContentRegistry);
|
||||
this.settings = environment.settings();
|
||||
this.storageService = storageService;
|
||||
|
||||
|
@ -85,11 +84,10 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
|
|||
this.basePath = BlobPath.cleanPath();
|
||||
}
|
||||
|
||||
this.compress = getSetting(COMPRESS, metadata);
|
||||
this.chunkSize = getSetting(CHUNK_SIZE, metadata);
|
||||
this.bucket = getSetting(BUCKET, metadata);
|
||||
this.clientName = CLIENT_NAME.get(metadata.settings());
|
||||
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath, chunkSize, compress);
|
||||
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath, chunkSize, isCompress());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -102,11 +100,6 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
|
|||
return basePath;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isCompress() {
|
||||
return compress;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ByteSizeValue chunkSize() {
|
||||
return chunkSize;
|
||||
|
|
|
@ -58,7 +58,6 @@ public final class HdfsRepository extends BlobStoreRepository {
|
|||
|
||||
private final Environment environment;
|
||||
private final ByteSizeValue chunkSize;
|
||||
private final boolean compress;
|
||||
private final BlobPath basePath = BlobPath.cleanPath();
|
||||
private final URI uri;
|
||||
private final String pathSetting;
|
||||
|
@ -69,11 +68,10 @@ public final class HdfsRepository extends BlobStoreRepository {
|
|||
|
||||
public HdfsRepository(RepositoryMetaData metadata, Environment environment,
|
||||
NamedXContentRegistry namedXContentRegistry) {
|
||||
super(metadata, environment.settings(), namedXContentRegistry);
|
||||
super(metadata, environment.settings(), metadata.settings().getAsBoolean("compress", false), namedXContentRegistry);
|
||||
|
||||
this.environment = environment;
|
||||
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
|
||||
this.compress = metadata.settings().getAsBoolean("compress", false);
|
||||
|
||||
String uriSetting = getMetadata().settings().get("uri");
|
||||
if (Strings.hasText(uriSetting) == false) {
|
||||
|
@ -239,11 +237,6 @@ public final class HdfsRepository extends BlobStoreRepository {
|
|||
return basePath;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isCompress() {
|
||||
return compress;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ByteSizeValue chunkSize() {
|
||||
return chunkSize;
|
||||
|
|
|
@ -19,8 +19,8 @@
|
|||
|
||||
package org.elasticsearch.repositories.s3;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
|
@ -155,8 +155,6 @@ class S3Repository extends BlobStoreRepository {
|
|||
|
||||
private final ByteSizeValue chunkSize;
|
||||
|
||||
private final boolean compress;
|
||||
|
||||
private final BlobPath basePath;
|
||||
|
||||
private final boolean serverSideEncryption;
|
||||
|
@ -174,7 +172,7 @@ class S3Repository extends BlobStoreRepository {
|
|||
final Settings settings,
|
||||
final NamedXContentRegistry namedXContentRegistry,
|
||||
final S3Service service) {
|
||||
super(metadata, settings, namedXContentRegistry);
|
||||
super(metadata, settings, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry);
|
||||
this.service = service;
|
||||
|
||||
this.repositoryMetaData = metadata;
|
||||
|
@ -187,7 +185,6 @@ class S3Repository extends BlobStoreRepository {
|
|||
|
||||
this.bufferSize = BUFFER_SIZE_SETTING.get(metadata.settings());
|
||||
this.chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings());
|
||||
this.compress = COMPRESS_SETTING.get(metadata.settings());
|
||||
|
||||
// We make sure that chunkSize is bigger or equal than/to bufferSize
|
||||
if (this.chunkSize.getBytes() < bufferSize.getBytes()) {
|
||||
|
@ -245,11 +242,6 @@ class S3Repository extends BlobStoreRepository {
|
|||
return basePath;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isCompress() {
|
||||
return compress;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ByteSizeValue chunkSize() {
|
||||
return chunkSize;
|
||||
|
|
|
@ -195,6 +195,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
private final Settings settings;
|
||||
|
||||
private final boolean compress;
|
||||
|
||||
private final RateLimiter snapshotRateLimiter;
|
||||
|
||||
private final RateLimiter restoreRateLimiter;
|
||||
|
@ -226,33 +228,37 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
*
|
||||
* @param metadata The metadata for this repository including name and settings
|
||||
* @param settings Settings for the node this repository object is created on
|
||||
* @param compress true if metadata and snapshot files should be compressed
|
||||
*/
|
||||
protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry namedXContentRegistry) {
|
||||
protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, boolean compress,
|
||||
NamedXContentRegistry namedXContentRegistry) {
|
||||
this.settings = settings;
|
||||
this.compress = compress;
|
||||
this.metadata = metadata;
|
||||
this.namedXContentRegistry = namedXContentRegistry;
|
||||
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
|
||||
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
|
||||
readOnly = metadata.settings().getAsBoolean("readonly", false);
|
||||
|
||||
|
||||
indexShardSnapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT,
|
||||
BlobStoreIndexShardSnapshot::fromXContent, namedXContentRegistry, isCompress());
|
||||
BlobStoreIndexShardSnapshot::fromXContent, namedXContentRegistry, compress);
|
||||
indexShardSnapshotsFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_INDEX_CODEC, SNAPSHOT_INDEX_NAME_FORMAT,
|
||||
BlobStoreIndexShardSnapshots::fromXContent, namedXContentRegistry, isCompress());
|
||||
ByteSizeValue chunkSize = chunkSize();
|
||||
if (chunkSize != null && chunkSize.getBytes() <= 0) {
|
||||
throw new IllegalArgumentException("the chunk size cannot be negative: [" + chunkSize + "]");
|
||||
}
|
||||
BlobStoreIndexShardSnapshots::fromXContent, namedXContentRegistry, compress);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
ByteSizeValue chunkSize = chunkSize();
|
||||
if (chunkSize != null && chunkSize.getBytes() <= 0) {
|
||||
throw new IllegalArgumentException("the chunk size cannot be negative: [" + chunkSize + "]");
|
||||
}
|
||||
globalMetaDataFormat = new ChecksumBlobStoreFormat<>(METADATA_CODEC, METADATA_NAME_FORMAT,
|
||||
MetaData::fromXContent, namedXContentRegistry, isCompress());
|
||||
MetaData::fromXContent, namedXContentRegistry, compress);
|
||||
indexMetaDataFormat = new ChecksumBlobStoreFormat<>(INDEX_METADATA_CODEC, METADATA_NAME_FORMAT,
|
||||
IndexMetaData::fromXContent, namedXContentRegistry, isCompress());
|
||||
IndexMetaData::fromXContent, namedXContentRegistry, compress);
|
||||
snapshotFormat = new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SNAPSHOT_NAME_FORMAT,
|
||||
SnapshotInfo::fromXContentInternal, namedXContentRegistry, isCompress());
|
||||
SnapshotInfo::fromXContentInternal, namedXContentRegistry, compress);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -347,8 +353,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
*
|
||||
* @return true if compression is needed
|
||||
*/
|
||||
protected boolean isCompress() {
|
||||
return false;
|
||||
protected final boolean isCompress() {
|
||||
return compress;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -63,21 +63,19 @@ public class FsRepository extends BlobStoreRepository {
|
|||
new ByteSizeValue(Long.MAX_VALUE), new ByteSizeValue(5), new ByteSizeValue(Long.MAX_VALUE), Property.NodeScope);
|
||||
public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope);
|
||||
public static final Setting<Boolean> REPOSITORIES_COMPRESS_SETTING =
|
||||
Setting.boolSetting("repositories.fs.compress", false, Property.NodeScope);
|
||||
Setting.boolSetting("repositories.fs.compress", false, Property.NodeScope, Property.Deprecated);
|
||||
private final Environment environment;
|
||||
|
||||
private ByteSizeValue chunkSize;
|
||||
|
||||
private final BlobPath basePath;
|
||||
|
||||
private boolean compress;
|
||||
|
||||
/**
|
||||
* Constructs a shared file system repository.
|
||||
*/
|
||||
public FsRepository(RepositoryMetaData metadata, Environment environment,
|
||||
NamedXContentRegistry namedXContentRegistry) {
|
||||
super(metadata, environment.settings(), namedXContentRegistry);
|
||||
super(metadata, environment.settings(), calculateCompress(metadata, environment), namedXContentRegistry);
|
||||
this.environment = environment;
|
||||
String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
|
||||
if (location.isEmpty()) {
|
||||
|
@ -105,11 +103,14 @@ public class FsRepository extends BlobStoreRepository {
|
|||
} else {
|
||||
this.chunkSize = REPOSITORIES_CHUNK_SIZE_SETTING.get(environment.settings());
|
||||
}
|
||||
this.compress = COMPRESS_SETTING.exists(metadata.settings())
|
||||
? COMPRESS_SETTING.get(metadata.settings()) : REPOSITORIES_COMPRESS_SETTING.get(environment.settings());
|
||||
this.basePath = BlobPath.cleanPath();
|
||||
}
|
||||
|
||||
private static boolean calculateCompress(RepositoryMetaData metadata, Environment environment) {
|
||||
return COMPRESS_SETTING.exists(metadata.settings())
|
||||
? COMPRESS_SETTING.get(metadata.settings()) : REPOSITORIES_COMPRESS_SETTING.get(environment.settings());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BlobStore createBlobStore() throws Exception {
|
||||
final String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
|
||||
|
@ -117,11 +118,6 @@ public class FsRepository extends BlobStoreRepository {
|
|||
return new FsBlobStore(environment.settings(), locationFile);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isCompress() {
|
||||
return compress;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ByteSizeValue chunkSize() {
|
||||
return chunkSize;
|
||||
|
|
|
@ -22,8 +22,10 @@ package org.elasticsearch.repositories.blobstore;
|
|||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
@ -232,6 +234,38 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
|||
assertEquals(0, repository.getRepositoryData().getIncompatibleSnapshotIds().size());
|
||||
}
|
||||
|
||||
public void testBadChunksize() throws Exception {
|
||||
final Client client = client();
|
||||
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
|
||||
final String repositoryName = "test-repo";
|
||||
|
||||
expectThrows(RepositoryException.class, () ->
|
||||
client.admin().cluster().preparePutRepository(repositoryName)
|
||||
.setType(REPO_TYPE)
|
||||
.setSettings(Settings.builder().put(node().settings())
|
||||
.put("location", location)
|
||||
.put("chunk_size", randomLongBetween(-10, 0), ByteSizeUnit.BYTES))
|
||||
.get());
|
||||
}
|
||||
|
||||
public void testFsRepositoryCompressDeprecated() {
|
||||
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
|
||||
final Settings settings = Settings.builder().put(node().settings()).put("location", location).build();
|
||||
final RepositoryMetaData metaData = new RepositoryMetaData("test-repo", REPO_TYPE, settings);
|
||||
|
||||
Settings useCompressSettings = Settings.builder()
|
||||
.put(node().getEnvironment().settings())
|
||||
.put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), true)
|
||||
.build();
|
||||
Environment useCompressEnvironment =
|
||||
new Environment(useCompressSettings, node().getEnvironment().configFile());
|
||||
|
||||
new FsRepository(metaData, useCompressEnvironment, null);
|
||||
|
||||
assertWarnings("[repositories.fs.compress] setting was deprecated in Elasticsearch and will be removed in a future release!" +
|
||||
" See the breaking changes documentation for the next major version.");
|
||||
}
|
||||
|
||||
private BlobStoreRepository setupRepo() {
|
||||
final Client client = client();
|
||||
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
|
||||
|
|
Loading…
Reference in New Issue