Motivated by slow snapshot deletes reported in e.g. #39656 and the fact that these likely are a contributing factor to repositories accumulating stale files over time when deletes fail to finish in time and are interrupted before they can complete. * Makes snapshot deletion async and parallelizes some steps of the delete process that can be safely run concurrently via the snapshot thread poll * I did not take the biggest potential speedup step here and parallelize the shard file deletion because that's probably better handled by moving to bulk deletes where possible (and can still be parallelized via the snapshot pool where it isn't). Also, I wanted to keep the size of the PR manageable. * See https://github.com/elastic/elasticsearch/pull/39656#issuecomment-470492106 * Also, as a side effect this gives the `SnapshotResiliencyTests` a little more coverage for master failover scenarios (since parallel access to a blob store repository during deletes is now possible since a delete isn't a single task anymore). * By adding a `ThreadPool` reference to the repository this also lays the groundwork to parallelizing shard snapshot uploads to improve the situation reported in #39657
This commit is contained in:
parent
4127d6889b
commit
aad33121d8
|
@ -26,6 +26,7 @@ import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.plugins.RepositoryPlugin;
|
import org.elasticsearch.plugins.RepositoryPlugin;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
import org.elasticsearch.repositories.url.URLRepository;
|
import org.elasticsearch.repositories.url.URLRepository;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -44,7 +45,9 @@ public class URLRepositoryPlugin extends Plugin implements RepositoryPlugin {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
return Collections.singletonMap(URLRepository.TYPE, metadata -> new URLRepository(metadata, env, namedXContentRegistry));
|
ThreadPool threadPool) {
|
||||||
|
return Collections.singletonMap(URLRepository.TYPE,
|
||||||
|
metadata -> new URLRepository(metadata, env, namedXContentRegistry, threadPool));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.repositories.RepositoryException;
|
import org.elasticsearch.repositories.RepositoryException;
|
||||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
@ -82,8 +83,8 @@ public class URLRepository extends BlobStoreRepository {
|
||||||
* Constructs a read-only URL-based repository
|
* Constructs a read-only URL-based repository
|
||||||
*/
|
*/
|
||||||
public URLRepository(RepositoryMetaData metadata, Environment environment,
|
public URLRepository(RepositoryMetaData metadata, Environment environment,
|
||||||
NamedXContentRegistry namedXContentRegistry) {
|
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
|
||||||
super(metadata, environment.settings(), false, namedXContentRegistry);
|
super(metadata, environment.settings(), false, namedXContentRegistry, threadPool);
|
||||||
|
|
||||||
if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
|
if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
|
||||||
throw new RepositoryException(metadata.name(), "missing url");
|
throw new RepositoryException(metadata.name(), "missing url");
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.env.TestEnvironment;
|
import org.elasticsearch.env.TestEnvironment;
|
||||||
import org.elasticsearch.repositories.RepositoryException;
|
import org.elasticsearch.repositories.RepositoryException;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
@ -34,12 +35,13 @@ import java.util.Collections;
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.CoreMatchers.not;
|
import static org.hamcrest.CoreMatchers.not;
|
||||||
import static org.hamcrest.CoreMatchers.nullValue;
|
import static org.hamcrest.CoreMatchers.nullValue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
public class URLRepositoryTests extends ESTestCase {
|
public class URLRepositoryTests extends ESTestCase {
|
||||||
|
|
||||||
private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) {
|
private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) {
|
||||||
return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings),
|
return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings),
|
||||||
new NamedXContentRegistry(Collections.emptyList())) {
|
new NamedXContentRegistry(Collections.emptyList()), mock(ThreadPool.class)) {
|
||||||
@Override
|
@Override
|
||||||
protected void assertSnapshotOrGenericThread() {
|
protected void assertSnapshotOrGenericThread() {
|
||||||
// eliminate thread name check as we create repo manually on test/main threads
|
// eliminate thread name check as we create repo manually on test/main threads
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.repositories.IndexId;
|
||||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
import org.elasticsearch.snapshots.SnapshotCreationException;
|
import org.elasticsearch.snapshots.SnapshotCreationException;
|
||||||
import org.elasticsearch.snapshots.SnapshotId;
|
import org.elasticsearch.snapshots.SnapshotId;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -86,8 +87,8 @@ public class AzureRepository extends BlobStoreRepository {
|
||||||
private final boolean readonly;
|
private final boolean readonly;
|
||||||
|
|
||||||
public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
|
public AzureRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
|
||||||
AzureStorageService storageService) {
|
AzureStorageService storageService, ThreadPool threadPool) {
|
||||||
super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry);
|
super(metadata, environment.settings(), Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool);
|
||||||
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
|
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
|
||||||
this.environment = environment;
|
this.environment = environment;
|
||||||
this.storageService = storageService;
|
this.storageService = storageService;
|
||||||
|
|
|
@ -28,6 +28,8 @@ import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.plugins.ReloadablePlugin;
|
import org.elasticsearch.plugins.ReloadablePlugin;
|
||||||
import org.elasticsearch.plugins.RepositoryPlugin;
|
import org.elasticsearch.plugins.RepositoryPlugin;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -47,9 +49,10 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
|
ThreadPool threadPool) {
|
||||||
return Collections.singletonMap(AzureRepository.TYPE,
|
return Collections.singletonMap(AzureRepository.TYPE,
|
||||||
(metadata) -> new AzureRepository(metadata, env, namedXContentRegistry, azureStoreService));
|
(metadata) -> new AzureRepository(metadata, env, namedXContentRegistry, azureStoreService, threadPool));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.env.TestEnvironment;
|
import org.elasticsearch.env.TestEnvironment;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
@ -42,7 +43,8 @@ public class AzureRepositorySettingsTests extends ESTestCase {
|
||||||
.put(settings)
|
.put(settings)
|
||||||
.build();
|
.build();
|
||||||
final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings),
|
final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings),
|
||||||
TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class));
|
TestEnvironment.newEnvironment(internalSettings), NamedXContentRegistry.EMPTY, mock(AzureStorageService.class),
|
||||||
|
mock(ThreadPool.class));
|
||||||
assertThat(azureRepository.getBlobStore(), is(nullValue()));
|
assertThat(azureRepository.getBlobStore(), is(nullValue()));
|
||||||
return azureRepository;
|
return azureRepository;
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,8 @@ import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.plugins.ReloadablePlugin;
|
import org.elasticsearch.plugins.ReloadablePlugin;
|
||||||
import org.elasticsearch.plugins.RepositoryPlugin;
|
import org.elasticsearch.plugins.RepositoryPlugin;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -49,9 +51,10 @@ public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
|
ThreadPool threadPool) {
|
||||||
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
|
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
|
||||||
(metadata) -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService));
|
metadata -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService, threadPool));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -25,13 +25,13 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.repositories.RepositoryException;
|
import org.elasticsearch.repositories.RepositoryException;
|
||||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
@ -59,7 +59,6 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
|
||||||
byteSizeSetting("chunk_size", MAX_CHUNK_SIZE, MIN_CHUNK_SIZE, MAX_CHUNK_SIZE, Property.NodeScope, Property.Dynamic);
|
byteSizeSetting("chunk_size", MAX_CHUNK_SIZE, MIN_CHUNK_SIZE, MAX_CHUNK_SIZE, Property.NodeScope, Property.Dynamic);
|
||||||
static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());
|
static final Setting<String> CLIENT_NAME = new Setting<>("client", "default", Function.identity());
|
||||||
|
|
||||||
private final Settings settings;
|
|
||||||
private final GoogleCloudStorageService storageService;
|
private final GoogleCloudStorageService storageService;
|
||||||
private final BlobPath basePath;
|
private final BlobPath basePath;
|
||||||
private final ByteSizeValue chunkSize;
|
private final ByteSizeValue chunkSize;
|
||||||
|
@ -68,9 +67,8 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
|
||||||
|
|
||||||
GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment,
|
GoogleCloudStorageRepository(RepositoryMetaData metadata, Environment environment,
|
||||||
NamedXContentRegistry namedXContentRegistry,
|
NamedXContentRegistry namedXContentRegistry,
|
||||||
GoogleCloudStorageService storageService) {
|
GoogleCloudStorageService storageService, ThreadPool threadPool) {
|
||||||
super(metadata, environment.settings(), getSetting(COMPRESS, metadata), namedXContentRegistry);
|
super(metadata, environment.settings(), getSetting(COMPRESS, metadata), namedXContentRegistry, threadPool);
|
||||||
this.settings = environment.settings();
|
|
||||||
this.storageService = storageService;
|
this.storageService = storageService;
|
||||||
|
|
||||||
String basePath = BASE_PATH.get(metadata.settings());
|
String basePath = BASE_PATH.get(metadata.settings());
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.plugins.RepositoryPlugin;
|
import org.elasticsearch.plugins.RepositoryPlugin;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
public final class HdfsPlugin extends Plugin implements RepositoryPlugin {
|
public final class HdfsPlugin extends Plugin implements RepositoryPlugin {
|
||||||
|
|
||||||
|
@ -110,7 +111,8 @@ public final class HdfsPlugin extends Plugin implements RepositoryPlugin {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry));
|
ThreadPool threadPool) {
|
||||||
|
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, threadPool));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.UncheckedIOException;
|
import java.io.UncheckedIOException;
|
||||||
|
@ -67,8 +68,8 @@ public final class HdfsRepository extends BlobStoreRepository {
|
||||||
private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB);
|
private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB);
|
||||||
|
|
||||||
public HdfsRepository(RepositoryMetaData metadata, Environment environment,
|
public HdfsRepository(RepositoryMetaData metadata, Environment environment,
|
||||||
NamedXContentRegistry namedXContentRegistry) {
|
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
|
||||||
super(metadata, environment.settings(), metadata.settings().getAsBoolean("compress", false), namedXContentRegistry);
|
super(metadata, environment.settings(), metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, threadPool);
|
||||||
|
|
||||||
this.environment = environment;
|
this.environment = environment;
|
||||||
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
|
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||||
import org.elasticsearch.repositories.RepositoryException;
|
import org.elasticsearch.repositories.RepositoryException;
|
||||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
@ -171,8 +172,8 @@ class S3Repository extends BlobStoreRepository {
|
||||||
S3Repository(final RepositoryMetaData metadata,
|
S3Repository(final RepositoryMetaData metadata,
|
||||||
final Settings settings,
|
final Settings settings,
|
||||||
final NamedXContentRegistry namedXContentRegistry,
|
final NamedXContentRegistry namedXContentRegistry,
|
||||||
final S3Service service) {
|
final S3Service service, final ThreadPool threadPool) {
|
||||||
super(metadata, settings, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry);
|
super(metadata, settings, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool);
|
||||||
this.service = service;
|
this.service = service;
|
||||||
|
|
||||||
this.repositoryMetaData = metadata;
|
this.repositoryMetaData = metadata;
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.plugins.ReloadablePlugin;
|
import org.elasticsearch.plugins.ReloadablePlugin;
|
||||||
import org.elasticsearch.plugins.RepositoryPlugin;
|
import org.elasticsearch.plugins.RepositoryPlugin;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.AccessController;
|
import java.security.AccessController;
|
||||||
|
@ -77,13 +78,15 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
|
||||||
// proxy method for testing
|
// proxy method for testing
|
||||||
protected S3Repository createRepository(final RepositoryMetaData metadata,
|
protected S3Repository createRepository(final RepositoryMetaData metadata,
|
||||||
final Settings settings,
|
final Settings settings,
|
||||||
final NamedXContentRegistry registry) {
|
final NamedXContentRegistry registry, final ThreadPool threadPool) {
|
||||||
return new S3Repository(metadata, settings, registry, service);
|
return new S3Repository(metadata, settings, registry, service, threadPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry) {
|
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry,
|
||||||
return Collections.singletonMap(S3Repository.TYPE, (metadata) -> createRepository(metadata, env.settings(), registry));
|
final ThreadPool threadPool) {
|
||||||
|
return Collections.singletonMap(S3Repository.TYPE,
|
||||||
|
metadata -> createRepository(metadata, env.settings(), registry, threadPool));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -30,12 +30,14 @@ import org.elasticsearch.common.settings.MockSecureSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.AccessController;
|
import java.security.AccessController;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
@SuppressForbidden(reason = "test fixture requires System.setProperty")
|
@SuppressForbidden(reason = "test fixture requires System.setProperty")
|
||||||
public class RepositoryCredentialsTests extends ESTestCase {
|
public class RepositoryCredentialsTests extends ESTestCase {
|
||||||
|
@ -77,8 +79,9 @@ public class RepositoryCredentialsTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry) {
|
protected S3Repository createRepository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry registry,
|
||||||
return new S3Repository(metadata, settings, registry, service){
|
ThreadPool threadPool) {
|
||||||
|
return new S3Repository(metadata, settings, registry, service, threadPool){
|
||||||
@Override
|
@Override
|
||||||
protected void assertSnapshotOrGenericThread() {
|
protected void assertSnapshotOrGenericThread() {
|
||||||
// eliminate thread name check as we create repo manually on test/main threads
|
// eliminate thread name check as we create repo manually on test/main threads
|
||||||
|
@ -106,7 +109,7 @@ public class RepositoryCredentialsTests extends ESTestCase {
|
||||||
.put(S3Repository.ACCESS_KEY_SETTING.getKey(), "insecure_aws_key")
|
.put(S3Repository.ACCESS_KEY_SETTING.getKey(), "insecure_aws_key")
|
||||||
.put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret").build());
|
.put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret").build());
|
||||||
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings);
|
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings);
|
||||||
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin);
|
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class));
|
||||||
AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
|
AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
|
||||||
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials();
|
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials();
|
||||||
assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key"));
|
assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key"));
|
||||||
|
@ -129,7 +132,7 @@ public class RepositoryCredentialsTests extends ESTestCase {
|
||||||
.put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret")
|
.put(S3Repository.SECRET_KEY_SETTING.getKey(), "insecure_aws_secret")
|
||||||
.build());
|
.build());
|
||||||
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(Settings.EMPTY);
|
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(Settings.EMPTY);
|
||||||
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin);
|
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class));
|
||||||
AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
|
AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
|
||||||
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials();
|
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials.getCredentials();
|
||||||
assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key"));
|
assertThat(credentials.getAWSAccessKeyId(), is("insecure_aws_key"));
|
||||||
|
@ -144,8 +147,8 @@ public class RepositoryCredentialsTests extends ESTestCase {
|
||||||
+ " See the breaking changes documentation for the next major version.");
|
+ " See the breaking changes documentation for the next major version.");
|
||||||
}
|
}
|
||||||
|
|
||||||
private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin) {
|
private S3Repository createAndStartRepository(RepositoryMetaData metadata, S3RepositoryPlugin s3Plugin, ThreadPool threadPool) {
|
||||||
final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY);
|
final S3Repository repository = s3Plugin.createRepository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, threadPool);
|
||||||
repository.start();
|
repository.start();
|
||||||
return repository;
|
return repository;
|
||||||
}
|
}
|
||||||
|
@ -168,7 +171,7 @@ public class RepositoryCredentialsTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
final RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", builder.build());
|
final RepositoryMetaData metadata = new RepositoryMetaData("dummy-repo", "mock", builder.build());
|
||||||
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings);
|
try (S3RepositoryPlugin s3Plugin = new ProxyS3RepositoryPlugin(settings);
|
||||||
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin)) {
|
S3Repository s3repo = createAndStartRepository(metadata, s3Plugin, mock(ThreadPool.class))) {
|
||||||
try (AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
|
try (AmazonS3Reference s3Ref = ((S3BlobStore) s3repo.blobStore()).clientReference()) {
|
||||||
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials
|
final AWSCredentials credentials = ((ProxyS3RepositoryPlugin.ClientAndCredentials) s3Ref.client()).credentials
|
||||||
.getCredentials();
|
.getCredentials();
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.rest.RestRequest;
|
||||||
import org.elasticsearch.rest.RestResponse;
|
import org.elasticsearch.rest.RestResponse;
|
||||||
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
|
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
|
||||||
import org.elasticsearch.test.rest.FakeRestRequest;
|
import org.elasticsearch.test.rest.FakeRestRequest;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
|
@ -114,14 +115,15 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry) {
|
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry,
|
||||||
|
final ThreadPool threadPool) {
|
||||||
return Collections.singletonMap(S3Repository.TYPE,
|
return Collections.singletonMap(S3Repository.TYPE,
|
||||||
(metadata) -> new S3Repository(metadata, env.settings(), registry, new S3Service() {
|
metadata -> new S3Repository(metadata, env.settings(), registry, new S3Service() {
|
||||||
@Override
|
@Override
|
||||||
AmazonS3 buildClient(S3ClientSettings clientSettings) {
|
AmazonS3 buildClient(S3ClientSettings clientSettings) {
|
||||||
return new MockAmazonS3(blobs, bucket, serverSideEncryption, cannedACL, storageClass);
|
return new MockAmazonS3(blobs, bucket, serverSideEncryption, cannedACL, storageClass);
|
||||||
}
|
}
|
||||||
}));
|
}, threadPool));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.repositories.RepositoryException;
|
import org.elasticsearch.repositories.RepositoryException;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -35,6 +36,7 @@ import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.not;
|
import static org.hamcrest.Matchers.not;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
public class S3RepositoryTests extends ESTestCase {
|
public class S3RepositoryTests extends ESTestCase {
|
||||||
|
|
||||||
|
@ -118,7 +120,7 @@ public class S3RepositoryTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private S3Repository createS3Repo(RepositoryMetaData metadata) {
|
private S3Repository createS3Repo(RepositoryMetaData metadata) {
|
||||||
return new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service()) {
|
return new S3Repository(metadata, Settings.EMPTY, NamedXContentRegistry.EMPTY, new DummyS3Service(), mock(ThreadPool.class)) {
|
||||||
@Override
|
@Override
|
||||||
protected void assertSnapshotOrGenericThread() {
|
protected void assertSnapshotOrGenericThread() {
|
||||||
// eliminate thread name check as we create repo manually on test/main threads
|
// eliminate thread name check as we create repo manually on test/main threads
|
||||||
|
|
|
@ -49,7 +49,9 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String executor() {
|
protected String executor() {
|
||||||
return ThreadPool.Names.SNAPSHOT;
|
// Using the generic instead of the snapshot threadpool here as the snapshot threadpool might be blocked on long running tasks
|
||||||
|
// which would block the request from getting an error response because of the ongoing task
|
||||||
|
return ThreadPool.Names.GENERIC;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -49,7 +49,9 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String executor() {
|
protected String executor() {
|
||||||
return ThreadPool.Names.SNAPSHOT;
|
// Using the generic instead of the snapshot threadpool here as the snapshot threadpool might be blocked on long running tasks
|
||||||
|
// which would block the request from getting an error response because of the ongoing task
|
||||||
|
return ThreadPool.Names.GENERIC;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Map;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An extension point for {@link Plugin} implementations to add custom snapshot repositories.
|
* An extension point for {@link Plugin} implementations to add custom snapshot repositories.
|
||||||
|
@ -39,7 +40,8 @@ public interface RepositoryPlugin {
|
||||||
* The key of the returned {@link Map} is the type name of the repository and
|
* The key of the returned {@link Map} is the type name of the repository and
|
||||||
* the value is a factory to construct the {@link Repository} interface.
|
* the value is a factory to construct the {@link Repository} interface.
|
||||||
*/
|
*/
|
||||||
default Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
default Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
|
ThreadPool threadPool) {
|
||||||
return Collections.emptyMap();
|
return Collections.emptyMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,7 +54,8 @@ public interface RepositoryPlugin {
|
||||||
* The key of the returned {@link Map} is the type name of the repository and
|
* The key of the returned {@link Map} is the type name of the repository and
|
||||||
* the value is a factory to construct the {@link Repository} interface.
|
* the value is a factory to construct the {@link Repository} interface.
|
||||||
*/
|
*/
|
||||||
default Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
default Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
|
ThreadPool threadPool) {
|
||||||
return Collections.emptyMap();
|
return Collections.emptyMap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.repositories;
|
||||||
|
|
||||||
import org.apache.lucene.index.IndexCommit;
|
import org.apache.lucene.index.IndexCommit;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||||
|
@ -84,8 +85,8 @@ public class FilterRepository implements Repository {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
|
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
|
||||||
in.deleteSnapshot(snapshotId, repositoryStateId);
|
in.deleteSnapshot(snapshotId, repositoryStateId, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -47,10 +47,10 @@ public class RepositoriesModule extends AbstractModule {
|
||||||
public RepositoriesModule(Environment env, List<RepositoryPlugin> repoPlugins, TransportService transportService,
|
public RepositoriesModule(Environment env, List<RepositoryPlugin> repoPlugins, TransportService transportService,
|
||||||
ClusterService clusterService, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry) {
|
ClusterService clusterService, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry) {
|
||||||
Map<String, Repository.Factory> factories = new HashMap<>();
|
Map<String, Repository.Factory> factories = new HashMap<>();
|
||||||
factories.put(FsRepository.TYPE, (metadata) -> new FsRepository(metadata, env, namedXContentRegistry));
|
factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, threadPool));
|
||||||
|
|
||||||
for (RepositoryPlugin repoPlugin : repoPlugins) {
|
for (RepositoryPlugin repoPlugin : repoPlugins) {
|
||||||
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry);
|
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, threadPool);
|
||||||
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) {
|
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) {
|
||||||
if (factories.put(entry.getKey(), entry.getValue()) != null) {
|
if (factories.put(entry.getKey(), entry.getValue()) != null) {
|
||||||
throw new IllegalArgumentException("Repository type [" + entry.getKey() + "] is already registered");
|
throw new IllegalArgumentException("Repository type [" + entry.getKey() + "] is already registered");
|
||||||
|
@ -60,7 +60,7 @@ public class RepositoriesModule extends AbstractModule {
|
||||||
|
|
||||||
Map<String, Repository.Factory> internalFactories = new HashMap<>();
|
Map<String, Repository.Factory> internalFactories = new HashMap<>();
|
||||||
for (RepositoryPlugin repoPlugin : repoPlugins) {
|
for (RepositoryPlugin repoPlugin : repoPlugins) {
|
||||||
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry);
|
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, threadPool);
|
||||||
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) {
|
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) {
|
||||||
if (internalFactories.put(entry.getKey(), entry.getValue()) != null) {
|
if (internalFactories.put(entry.getKey(), entry.getValue()) != null) {
|
||||||
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered");
|
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered");
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.repositories;
|
||||||
|
|
||||||
import org.apache.lucene.index.IndexCommit;
|
import org.apache.lucene.index.IndexCommit;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||||
|
@ -140,8 +141,9 @@ public interface Repository extends LifecycleComponent {
|
||||||
*
|
*
|
||||||
* @param snapshotId snapshot id
|
* @param snapshotId snapshot id
|
||||||
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
|
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
|
||||||
|
* @param listener completion listener
|
||||||
*/
|
*/
|
||||||
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId);
|
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns snapshot throttle time in nanoseconds
|
* Returns snapshot throttle time in nanoseconds
|
||||||
|
|
|
@ -100,11 +100,11 @@ public abstract class BlobStoreFormat<T extends ToXContent> {
|
||||||
/**
|
/**
|
||||||
* Checks obj in the blob container
|
* Checks obj in the blob container
|
||||||
*/
|
*/
|
||||||
public boolean exists(BlobContainer blobContainer, String name) throws IOException {
|
public boolean exists(BlobContainer blobContainer, String name) {
|
||||||
return blobContainer.blobExists(blobName(name));
|
return blobContainer.blobExists(blobName(name));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String blobName(String name) {
|
public String blobName(String name) {
|
||||||
return String.format(Locale.ROOT, blobNameFormat, name);
|
return String.format(Locale.ROOT, blobNameFormat, name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,8 +31,10 @@ import org.apache.lucene.util.BytesRefBuilder;
|
||||||
import org.apache.lucene.util.SetOnce;
|
import org.apache.lucene.util.SetOnce;
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.ResourceNotFoundException;
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.ActionRunnable;
|
||||||
|
import org.elasticsearch.action.support.GroupedActionListener;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||||
|
@ -103,9 +105,12 @@ import java.io.InputStream;
|
||||||
import java.nio.file.FileAlreadyExistsException;
|
import java.nio.file.FileAlreadyExistsException;
|
||||||
import java.nio.file.NoSuchFileException;
|
import java.nio.file.NoSuchFileException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -163,6 +168,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
|
|
||||||
protected final NamedXContentRegistry namedXContentRegistry;
|
protected final NamedXContentRegistry namedXContentRegistry;
|
||||||
|
|
||||||
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
private static final int BUFFER_SIZE = 4096;
|
private static final int BUFFER_SIZE = 4096;
|
||||||
|
|
||||||
private static final String SNAPSHOT_PREFIX = "snap-";
|
private static final String SNAPSHOT_PREFIX = "snap-";
|
||||||
|
@ -225,17 +232,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs new BlobStoreRepository
|
* Constructs new BlobStoreRepository
|
||||||
*
|
|
||||||
* @param metadata The metadata for this repository including name and settings
|
* @param metadata The metadata for this repository including name and settings
|
||||||
* @param settings Settings for the node this repository object is created on
|
* @param settings Settings for the node this repository object is created on
|
||||||
* @param compress true if metadata and snapshot files should be compressed
|
* @param threadPool Threadpool to run long running repository manipulations on asynchronously
|
||||||
*/
|
*/
|
||||||
protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, boolean compress,
|
protected BlobStoreRepository(RepositoryMetaData metadata, Settings settings, boolean compress,
|
||||||
NamedXContentRegistry namedXContentRegistry) {
|
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
|
||||||
this.settings = settings;
|
this.settings = settings;
|
||||||
this.compress = compress;
|
this.compress = compress;
|
||||||
this.metadata = metadata;
|
this.metadata = metadata;
|
||||||
this.namedXContentRegistry = namedXContentRegistry;
|
this.namedXContentRegistry = namedXContentRegistry;
|
||||||
|
this.threadPool = threadPool;
|
||||||
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
|
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
|
||||||
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
|
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
|
||||||
readOnly = metadata.settings().getAsBoolean("readonly", false);
|
readOnly = metadata.settings().getAsBoolean("readonly", false);
|
||||||
|
@ -405,108 +412,98 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
|
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
|
||||||
if (isReadOnly()) {
|
if (isReadOnly()) {
|
||||||
throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository");
|
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
|
||||||
}
|
} else {
|
||||||
|
|
||||||
final RepositoryData repositoryData = getRepositoryData();
|
|
||||||
SnapshotInfo snapshot = null;
|
SnapshotInfo snapshot = null;
|
||||||
try {
|
try {
|
||||||
snapshot = getSnapshotInfo(snapshotId);
|
snapshot = getSnapshotInfo(snapshotId);
|
||||||
} catch (SnapshotMissingException ex) {
|
} catch (SnapshotMissingException ex) {
|
||||||
throw ex;
|
listener.onFailure(ex);
|
||||||
|
return;
|
||||||
} catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) {
|
} catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) {
|
||||||
logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex);
|
logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
|
||||||
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
|
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
|
||||||
final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
|
final RepositoryData repositoryData;
|
||||||
|
final RepositoryData updatedRepositoryData;
|
||||||
|
try {
|
||||||
|
repositoryData = getRepositoryData();
|
||||||
|
updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
|
||||||
writeIndexGen(updatedRepositoryData, repositoryStateId);
|
writeIndexGen(updatedRepositoryData, repositoryStateId);
|
||||||
|
} catch (Exception ex) {
|
||||||
// delete the snapshot file
|
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex));
|
||||||
deleteSnapshotBlobIgnoringErrors(snapshot, snapshotId.getUUID());
|
return;
|
||||||
// delete the global metadata file
|
}
|
||||||
deleteGlobalMetaDataBlobIgnoringErrors(snapshot, snapshotId.getUUID());
|
final SnapshotInfo finalSnapshotInfo = snapshot;
|
||||||
|
final Collection<IndexId> unreferencedIndices = Sets.newHashSet(repositoryData.getIndices().values());
|
||||||
// Now delete all indices
|
unreferencedIndices.removeAll(updatedRepositoryData.getIndices().values());
|
||||||
if (snapshot != null) {
|
|
||||||
final List<String> indices = snapshot.indices();
|
|
||||||
for (String index : indices) {
|
|
||||||
final IndexId indexId = repositoryData.resolveIndexId(index);
|
|
||||||
|
|
||||||
IndexMetaData indexMetaData = null;
|
|
||||||
try {
|
try {
|
||||||
indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId);
|
blobContainer().deleteBlobsIgnoringIfNotExists(
|
||||||
} catch (ElasticsearchParseException | IOException ex) {
|
Arrays.asList(snapshotFormat.blobName(snapshotId.getUUID()), globalMetaDataFormat.blobName(snapshotId.getUUID())));
|
||||||
logger.warn(() ->
|
} catch (IOException e) {
|
||||||
new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex);
|
logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata files", snapshotId), e);
|
||||||
}
|
}
|
||||||
|
deleteIndices(
|
||||||
deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId);
|
Optional.ofNullable(finalSnapshotInfo)
|
||||||
|
.map(info -> info.indices().stream().map(repositoryData::resolveIndexId).collect(Collectors.toList()))
|
||||||
if (indexMetaData != null) {
|
.orElse(Collections.emptyList()),
|
||||||
for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
|
snapshotId,
|
||||||
|
ActionListener.map(listener, v -> {
|
||||||
try {
|
try {
|
||||||
delete(snapshotId, indexId, new ShardId(indexMetaData.getIndex(), shardId));
|
blobStore().blobContainer(basePath().add("indices")).deleteBlobsIgnoringIfNotExists(
|
||||||
} catch (SnapshotException ex) {
|
unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList()));
|
||||||
final int finalShardId = shardId;
|
} catch (IOException e) {
|
||||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
|
|
||||||
snapshotId, index, finalShardId), ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// cleanup indices that are no longer part of the repository
|
|
||||||
final Collection<IndexId> indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values());
|
|
||||||
indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values());
|
|
||||||
final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices"));
|
|
||||||
try {
|
|
||||||
indicesBlobContainer.deleteBlobsIgnoringIfNotExists(
|
|
||||||
indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList()));
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
// a different IOException occurred while trying to delete - will just log the issue for now
|
|
||||||
logger.warn(() ->
|
logger.warn(() ->
|
||||||
new ParameterizedMessage(
|
new ParameterizedMessage(
|
||||||
"[{}] indices {} are no longer part of any snapshots in the repository, " +
|
"[{}] indices {} are no longer part of any snapshots in the repository, " +
|
||||||
"but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe);
|
"but failed to clean up their index folders.", metadata.name(), unreferencedIndices), e);
|
||||||
}
|
}
|
||||||
} catch (IOException | ResourceNotFoundException ex) {
|
return null;
|
||||||
throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex);
|
})
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteSnapshotBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) {
|
private void deleteIndices(List<IndexId> indices, SnapshotId snapshotId, ActionListener<Void> listener) {
|
||||||
|
if (indices.isEmpty()) {
|
||||||
|
listener.onResponse(null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final ActionListener<Void> groupedListener = new GroupedActionListener<>(ActionListener.map(listener, v -> null), indices.size());
|
||||||
|
for (IndexId indexId: indices) {
|
||||||
|
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable<Void>(groupedListener) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doRun() {
|
||||||
|
IndexMetaData indexMetaData = null;
|
||||||
try {
|
try {
|
||||||
snapshotFormat.delete(blobContainer(), blobId);
|
indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId);
|
||||||
} catch (IOException e) {
|
} catch (Exception ex) {
|
||||||
if (snapshotInfo != null) {
|
logger.warn(() ->
|
||||||
logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete snapshot file [{}]",
|
new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex);
|
||||||
snapshotInfo.snapshotId(), blobId), e);
|
|
||||||
} else {
|
|
||||||
logger.warn(() -> new ParameterizedMessage("Unable to delete snapshot file [{}]", blobId), e);
|
|
||||||
}
|
}
|
||||||
}
|
deleteIndexMetaDataBlobIgnoringErrors(snapshotId, indexId);
|
||||||
}
|
if (indexMetaData != null) {
|
||||||
|
for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
|
||||||
private void deleteGlobalMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) {
|
|
||||||
try {
|
try {
|
||||||
globalMetaDataFormat.delete(blobContainer(), blobId);
|
final ShardId sid = new ShardId(indexMetaData.getIndex(), shardId);
|
||||||
} catch (IOException e) {
|
new Context(snapshotId, indexId, sid, sid).delete();
|
||||||
if (snapshotInfo != null) {
|
} catch (SnapshotException ex) {
|
||||||
logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata file [{}]",
|
final int finalShardId = shardId;
|
||||||
snapshotInfo.snapshotId(), blobId), e);
|
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
|
||||||
} else {
|
snapshotId, indexId.getName(), finalShardId), ex);
|
||||||
logger.warn(() -> new ParameterizedMessage("Unable to delete global metadata file [{}]", blobId), e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
groupedListener.onResponse(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void deleteIndexMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final IndexId indexId) {
|
private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) {
|
||||||
final SnapshotId snapshotId = snapshotInfo.snapshotId();
|
|
||||||
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId()));
|
BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId()));
|
||||||
try {
|
try {
|
||||||
indexMetaDataFormat.delete(indexMetaDataBlobContainer, snapshotId.getUUID());
|
indexMetaDataFormat.delete(indexMetaDataBlobContainer, snapshotId.getUUID());
|
||||||
|
@ -904,17 +901,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Delete shard snapshot
|
|
||||||
*
|
|
||||||
* @param snapshotId snapshot id
|
|
||||||
* @param shardId shard id
|
|
||||||
*/
|
|
||||||
private void delete(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
|
|
||||||
Context context = new Context(snapshotId, indexId, shardId, shardId);
|
|
||||||
context.delete();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "BlobStoreRepository[" +
|
return "BlobStoreRepository[" +
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.repositories.RepositoryException;
|
import org.elasticsearch.repositories.RepositoryException;
|
||||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
@ -73,9 +74,9 @@ public class FsRepository extends BlobStoreRepository {
|
||||||
/**
|
/**
|
||||||
* Constructs a shared file system repository.
|
* Constructs a shared file system repository.
|
||||||
*/
|
*/
|
||||||
public FsRepository(RepositoryMetaData metadata, Environment environment,
|
public FsRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
|
||||||
NamedXContentRegistry namedXContentRegistry) {
|
ThreadPool threadPool) {
|
||||||
super(metadata, environment.settings(), calculateCompress(metadata, environment), namedXContentRegistry);
|
super(metadata, environment.settings(), calculateCompress(metadata, environment), namedXContentRegistry, threadPool);
|
||||||
this.environment = environment;
|
this.environment = environment;
|
||||||
String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
|
String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
|
||||||
if (location.isEmpty()) {
|
if (location.isEmpty()) {
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.lucene.util.CollectionUtil;
|
import org.apache.lucene.util.CollectionUtil;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.ActionRunnable;
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
|
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
@ -1308,15 +1309,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||||
* @param repositoryStateId the unique id representing the state of the repository at the time the deletion began
|
* @param repositoryStateId the unique id representing the state of the repository at the time the deletion began
|
||||||
*/
|
*/
|
||||||
private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener<Void> listener, long repositoryStateId) {
|
private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener<Void> listener, long repositoryStateId) {
|
||||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
|
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable<Void>(listener) {
|
||||||
try {
|
@Override
|
||||||
|
protected void doRun() {
|
||||||
Repository repository = repositoriesService.repository(snapshot.getRepository());
|
Repository repository = repositoriesService.repository(snapshot.getRepository());
|
||||||
repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId);
|
repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, ActionListener.wrap(v -> {
|
||||||
logger.info("snapshot [{}] deleted", snapshot);
|
logger.info("snapshot [{}] deleted", snapshot);
|
||||||
|
|
||||||
removeSnapshotDeletionFromClusterState(snapshot, null, listener);
|
removeSnapshotDeletionFromClusterState(snapshot, null, listener);
|
||||||
} catch (Exception ex) {
|
}, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, listener)
|
||||||
removeSnapshotDeletionFromClusterState(snapshot, ex, listener);
|
));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,12 +43,14 @@ public class RepositoriesModuleTests extends ESTestCase {
|
||||||
private RepositoryPlugin plugin1;
|
private RepositoryPlugin plugin1;
|
||||||
private RepositoryPlugin plugin2;
|
private RepositoryPlugin plugin2;
|
||||||
private Repository.Factory factory;
|
private Repository.Factory factory;
|
||||||
|
private ThreadPool threadPool;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
environment = mock(Environment.class);
|
environment = mock(Environment.class);
|
||||||
contentRegistry = mock(NamedXContentRegistry.class);
|
contentRegistry = mock(NamedXContentRegistry.class);
|
||||||
|
threadPool = mock(ThreadPool.class);
|
||||||
plugin1 = mock(RepositoryPlugin.class);
|
plugin1 = mock(RepositoryPlugin.class);
|
||||||
plugin2 = mock(RepositoryPlugin.class);
|
plugin2 = mock(RepositoryPlugin.class);
|
||||||
factory = mock(Repository.Factory.class);
|
factory = mock(Repository.Factory.class);
|
||||||
|
@ -58,43 +60,46 @@ public class RepositoriesModuleTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCanRegisterTwoRepositoriesWithDifferentTypes() {
|
public void testCanRegisterTwoRepositoriesWithDifferentTypes() {
|
||||||
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
|
when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory));
|
||||||
when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type2", factory));
|
when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type2", factory));
|
||||||
|
|
||||||
// Would throw
|
// Would throw
|
||||||
new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
|
new RepositoriesModule(
|
||||||
mock(ThreadPool.class), contentRegistry);
|
environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class), threadPool, contentRegistry);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCannotRegisterTwoRepositoriesWithSameTypes() {
|
public void testCannotRegisterTwoRepositoriesWithSameTypes() {
|
||||||
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
|
when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory));
|
||||||
when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
|
when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory));
|
||||||
|
|
||||||
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
|
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
|
||||||
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
|
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
|
||||||
mock(ThreadPool.class), contentRegistry));
|
threadPool, contentRegistry));
|
||||||
|
|
||||||
assertEquals("Repository type [type1] is already registered", ex.getMessage());
|
assertEquals("Repository type [type1] is already registered", ex.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() {
|
public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() {
|
||||||
when(plugin1.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
|
when(plugin1.getInternalRepositories(environment, contentRegistry, threadPool))
|
||||||
when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
|
.thenReturn(Collections.singletonMap("type1", factory));
|
||||||
|
when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool))
|
||||||
|
.thenReturn(Collections.singletonMap("type1", factory));
|
||||||
|
|
||||||
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
|
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
|
||||||
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
|
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
|
||||||
mock(ThreadPool.class), contentRegistry));
|
threadPool, contentRegistry));
|
||||||
|
|
||||||
assertEquals("Internal repository type [type1] is already registered", ex.getMessage());
|
assertEquals("Internal repository type [type1] is already registered", ex.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() {
|
public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() {
|
||||||
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
|
when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory));
|
||||||
when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
|
when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool))
|
||||||
|
.thenReturn(Collections.singletonMap("type1", factory));
|
||||||
|
|
||||||
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
|
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
|
||||||
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
|
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
|
||||||
mock(ThreadPool.class), contentRegistry));
|
threadPool, contentRegistry));
|
||||||
|
|
||||||
assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage());
|
assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage());
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.repositories;
|
||||||
|
|
||||||
import org.apache.lucene.index.IndexCommit;
|
import org.apache.lucene.index.IndexCommit;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||||
|
@ -149,8 +150,8 @@ public class RepositoriesServiceTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
|
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
|
||||||
|
listener.onResponse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -180,7 +180,7 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
|
||||||
private Repository createRepository() {
|
private Repository createRepository() {
|
||||||
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
|
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
|
||||||
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
|
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
|
||||||
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry()) {
|
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool) {
|
||||||
@Override
|
@Override
|
||||||
protected void assertSnapshotOrGenericThread() {
|
protected void assertSnapshotOrGenericThread() {
|
||||||
// eliminate thread name check as we create repo manually
|
// eliminate thread name check as we create repo manually
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.snapshots.SnapshotId;
|
||||||
import org.elasticsearch.snapshots.SnapshotState;
|
import org.elasticsearch.snapshots.SnapshotState;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
@ -67,12 +68,13 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
// the reason for this plug-in is to drop any assertSnapshotOrGenericThread as mostly all access in this test goes from test threads
|
// the reason for this plug-in is to drop any assertSnapshotOrGenericThread as mostly all access in this test goes from test threads
|
||||||
public static class FsLikeRepoPlugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin {
|
public static class FsLikeRepoPlugin extends Plugin implements RepositoryPlugin {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
|
ThreadPool threadPool) {
|
||||||
return Collections.singletonMap(REPO_TYPE,
|
return Collections.singletonMap(REPO_TYPE,
|
||||||
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry) {
|
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry, threadPool) {
|
||||||
@Override
|
@Override
|
||||||
protected void assertSnapshotOrGenericThread() {
|
protected void assertSnapshotOrGenericThread() {
|
||||||
// eliminate thread name check as we access blobStore on test/main threads
|
// eliminate thread name check as we access blobStore on test/main threads
|
||||||
|
@ -260,7 +262,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
||||||
Environment useCompressEnvironment =
|
Environment useCompressEnvironment =
|
||||||
new Environment(useCompressSettings, node().getEnvironment().configFile());
|
new Environment(useCompressSettings, node().getEnvironment().configFile());
|
||||||
|
|
||||||
new FsRepository(metaData, useCompressEnvironment, null);
|
new FsRepository(metaData, useCompressEnvironment, null, null);
|
||||||
|
|
||||||
assertWarnings("[repositories.fs.compress] setting was deprecated in Elasticsearch and will be removed in a future release!" +
|
assertWarnings("[repositories.fs.compress] setting was deprecated in Elasticsearch and will be removed in a future release!" +
|
||||||
" See the breaking changes documentation for the next major version.");
|
" See the breaking changes documentation for the next major version.");
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.repositories.RepositoriesService;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -187,8 +188,8 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte
|
||||||
|
|
||||||
public CountingMockRepository(final RepositoryMetaData metadata,
|
public CountingMockRepository(final RepositoryMetaData metadata,
|
||||||
final Environment environment,
|
final Environment environment,
|
||||||
final NamedXContentRegistry namedXContentRegistry) throws IOException {
|
final NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
|
||||||
super(metadata, environment, namedXContentRegistry);
|
super(metadata, environment, namedXContentRegistry, threadPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -207,8 +208,10 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte
|
||||||
/** A plugin that uses CountingMockRepository as implementation of the Repository **/
|
/** A plugin that uses CountingMockRepository as implementation of the Repository **/
|
||||||
public static class CountingMockRepositoryPlugin extends MockRepository.Plugin {
|
public static class CountingMockRepositoryPlugin extends MockRepository.Plugin {
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
return Collections.singletonMap("coutingmock", (metadata) -> new CountingMockRepository(metadata, env, namedXContentRegistry));
|
ThreadPool threadPool) {
|
||||||
|
return Collections.singletonMap("coutingmock",
|
||||||
|
metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, threadPool));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -889,7 +889,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||||
repositoriesService = new RepositoriesService(
|
repositoriesService = new RepositoriesService(
|
||||||
settings, clusterService, transportService,
|
settings, clusterService, transportService,
|
||||||
Collections.singletonMap(FsRepository.TYPE, metaData -> {
|
Collections.singletonMap(FsRepository.TYPE, metaData -> {
|
||||||
final Repository repository = new FsRepository(metaData, environment, xContentRegistry()) {
|
final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) {
|
||||||
@Override
|
@Override
|
||||||
protected void assertSnapshotOrGenericThread() {
|
protected void assertSnapshotOrGenericThread() {
|
||||||
// eliminate thread name check as we create repo in the test thread
|
// eliminate thread name check as we create repo in the test thread
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.repositories.IndexId;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
import org.elasticsearch.repositories.fs.FsRepository;
|
import org.elasticsearch.repositories.fs.FsRepository;
|
||||||
import org.elasticsearch.snapshots.SnapshotId;
|
import org.elasticsearch.snapshots.SnapshotId;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
@ -69,8 +70,9 @@ public class MockRepository extends FsRepository {
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
return Collections.singletonMap("mock", (metadata) -> new MockRepository(metadata, env, namedXContentRegistry));
|
ThreadPool threadPool) {
|
||||||
|
return Collections.singletonMap("mock", (metadata) -> new MockRepository(metadata, env, namedXContentRegistry, threadPool));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -113,8 +115,8 @@ public class MockRepository extends FsRepository {
|
||||||
private volatile boolean blocked = false;
|
private volatile boolean blocked = false;
|
||||||
|
|
||||||
public MockRepository(RepositoryMetaData metadata, Environment environment,
|
public MockRepository(RepositoryMetaData metadata, Environment environment,
|
||||||
NamedXContentRegistry namedXContentRegistry) throws IOException {
|
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
|
||||||
super(overrideSettings(metadata, environment), environment, namedXContentRegistry);
|
super(overrideSettings(metadata, environment), environment, namedXContentRegistry, threadPool);
|
||||||
randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0);
|
randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0);
|
||||||
randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
|
randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
|
||||||
useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false);
|
useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false);
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.index.shard;
|
||||||
|
|
||||||
import org.apache.lucene.index.IndexCommit;
|
import org.apache.lucene.index.IndexCommit;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||||
|
@ -103,7 +104,8 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
|
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
|
||||||
|
listener.onResponse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -132,7 +132,6 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
||||||
private final CcrLicenseChecker ccrLicenseChecker;
|
private final CcrLicenseChecker ccrLicenseChecker;
|
||||||
private final SetOnce<CcrRestoreSourceService> restoreSourceService = new SetOnce<>();
|
private final SetOnce<CcrRestoreSourceService> restoreSourceService = new SetOnce<>();
|
||||||
private final SetOnce<CcrSettings> ccrSettings = new SetOnce<>();
|
private final SetOnce<CcrSettings> ccrSettings = new SetOnce<>();
|
||||||
private final SetOnce<ThreadPool> threadPool = new SetOnce<>();
|
|
||||||
private Client client;
|
private Client client;
|
||||||
private final boolean transportClientMode;
|
private final boolean transportClientMode;
|
||||||
|
|
||||||
|
@ -177,7 +176,6 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
||||||
|
|
||||||
CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings());
|
CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings());
|
||||||
this.ccrSettings.set(ccrSettings);
|
this.ccrSettings.set(ccrSettings);
|
||||||
this.threadPool.set(threadPool);
|
|
||||||
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(threadPool, ccrSettings);
|
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(threadPool, ccrSettings);
|
||||||
this.restoreSourceService.set(restoreSourceService);
|
this.restoreSourceService.set(restoreSourceService);
|
||||||
return Arrays.asList(
|
return Arrays.asList(
|
||||||
|
@ -326,9 +324,10 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
|
ThreadPool threadPool) {
|
||||||
Repository.Factory repositoryFactory =
|
Repository.Factory repositoryFactory =
|
||||||
(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool.get());
|
(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool);
|
||||||
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
|
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -260,7 +260,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
|
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener<Void> listener) {
|
||||||
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
|
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -357,7 +357,8 @@ public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
|
ThreadPool threadPool) {
|
||||||
return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory());
|
return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.search.SearchHits;
|
||||||
import org.elasticsearch.search.slice.SliceBuilder;
|
import org.elasticsearch.search.slice.SliceBuilder;
|
||||||
import org.elasticsearch.search.sort.SortOrder;
|
import org.elasticsearch.search.sort.SortOrder;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -72,7 +73,8 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase {
|
||||||
|
|
||||||
public static final class MyPlugin extends Plugin implements RepositoryPlugin, EnginePlugin {
|
public static final class MyPlugin extends Plugin implements RepositoryPlugin, EnginePlugin {
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
|
ThreadPool threadPool) {
|
||||||
return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory());
|
return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory());
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -328,7 +328,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
|
||||||
private Repository createRepository() throws IOException {
|
private Repository createRepository() throws IOException {
|
||||||
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
|
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
|
||||||
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
|
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
|
||||||
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry());
|
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {
|
private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {
|
||||||
|
|
|
@ -394,17 +394,20 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
HashMap<String, Repository.Factory> repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry));
|
ThreadPool threadPool) {
|
||||||
filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry)));
|
HashMap<String, Repository.Factory> repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry, threadPool));
|
||||||
|
filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, threadPool)));
|
||||||
return repositories;
|
return repositories;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
|
||||||
HashMap<String, Repository.Factory> internalRepositories = new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry));
|
ThreadPool threadPool) {
|
||||||
|
HashMap<String, Repository.Factory> internalRepositories =
|
||||||
|
new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, threadPool));
|
||||||
filterPlugins(RepositoryPlugin.class).forEach(r ->
|
filterPlugins(RepositoryPlugin.class).forEach(r ->
|
||||||
internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry)));
|
internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, threadPool)));
|
||||||
return internalRepositories;
|
return internalRepositories;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue