Make BlobStoreRepository Aware of ClusterState (#49639) (#49711)

* Make BlobStoreRepository Aware of ClusterState (#49639)

This is a preliminary to #49060.

It does not introduce any substantial behavior change to how the blob store repository
operates. What it does is to add all the infrastructure changes around passing the cluster service to the blob store, associated test changes and a best effort approach to tracking the latest repository generation on all nodes from cluster state updates. This brings a slight improvement to the consistency
by which non-master nodes (or master directly after a failover) will be able to determine the latest repository generation. It does not however do any tricky checks for the situation after a repository operation
(create, delete or cleanup) that could theoretically be used to get even greater accuracy to keep this change simple.
This change does not in any way alter the behavior of the blobstore repository other than adding a better "guess" for the value of the latest repo generation and is mainly intended to isolate the actual logical change to how the
repository operates in #49060
This commit is contained in:
Armin Braun 2019-11-29 14:57:47 +01:00 committed by GitHub
parent 3d525e18f9
commit 813b49adb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 327 additions and 154 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.plugin.repository.url;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
@ -26,7 +27,6 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.url.URLRepository;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
@ -46,8 +46,8 @@ public class URLRepositoryPlugin extends Plugin implements RepositoryPlugin {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
return Collections.singletonMap(URLRepository.TYPE,
metadata -> new URLRepository(metadata, env, namedXContentRegistry, threadPool));
metadata -> new URLRepository(metadata, env, namedXContentRegistry, clusterService));
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.repositories.url;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
@ -33,7 +34,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
@ -83,8 +83,8 @@ public class URLRepository extends BlobStoreRepository {
* Constructs a read-only URL-based repository
*/
public URLRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
super(metadata, false, namedXContentRegistry, threadPool);
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) {
super(metadata, false, namedXContentRegistry, clusterService);
if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
throw new RepositoryException(metadata.name(), "missing url");

View File

@ -25,8 +25,8 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.file.Path;
@ -35,13 +35,12 @@ import java.util.Collections;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.mockito.Mockito.mock;
public class URLRepositoryTests extends ESTestCase {
private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) {
return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings),
new NamedXContentRegistry(Collections.emptyList()), mock(ThreadPool.class)) {
new NamedXContentRegistry(Collections.emptyList()), BlobStoreTestUtil.mockClusterService()) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
@ -32,7 +33,6 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Locale;
import java.util.function.Function;
@ -81,8 +81,8 @@ public class AzureRepository extends BlobStoreRepository {
final RepositoryMetaData metadata,
final NamedXContentRegistry namedXContentRegistry,
final AzureStorageService storageService,
final ThreadPool threadPool) {
super(metadata, Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool);
final ClusterService clusterService) {
super(metadata, Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService);
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.storageService = storageService;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.repositories.azure;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
@ -31,7 +32,6 @@ import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
@ -60,9 +60,9 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
return Collections.singletonMap(AzureRepository.TYPE,
(metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, threadPool));
(metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, clusterService));
}
@Override

View File

@ -26,8 +26,8 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@ -42,8 +42,7 @@ public class AzureRepositorySettingsTests extends ESTestCase {
.put(settings)
.build();
final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings),
NamedXContentRegistry.EMPTY, mock(AzureStorageService.class),
mock(ThreadPool.class));
NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), BlobStoreTestUtil.mockClusterService());
assertThat(azureRepository.getBlobStore(), is(nullValue()));
return azureRepository;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.repositories.gcs;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -27,7 +28,6 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
@ -52,9 +52,9 @@ public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, threadPool));
metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, clusterService));
}
@Override

View File

@ -22,6 +22,7 @@ package org.elasticsearch.repositories.gcs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.settings.Setting;
@ -30,7 +31,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.function.Function;
@ -68,8 +68,8 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
final RepositoryMetaData metadata,
final NamedXContentRegistry namedXContentRegistry,
final GoogleCloudStorageService storageService,
final ThreadPool threadPool) {
super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, threadPool);
final ClusterService clusterService) {
super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService);
this.storageService = storageService;
String basePath = BASE_PATH.get(metadata.settings());

View File

@ -27,6 +27,7 @@ import com.sun.net.httpserver.HttpHandler;
import fixture.gcs.FakeOAuth2HttpHandler;
import fixture.gcs.GoogleCloudStorageHttpHandler;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.MockSecureSettings;
@ -38,7 +39,6 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.threeten.bp.Duration;
import java.io.IOException;
@ -171,9 +171,10 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
}
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry registry, ThreadPool threadPool) {
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry registry,
ClusterService clusterService) {
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, threadPool) {
metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, clusterService) {
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore("bucket", "test", storageService) {

View File

@ -30,13 +30,13 @@ import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SecurityUtil;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
public final class HdfsPlugin extends Plugin implements RepositoryPlugin {
@ -112,7 +112,7 @@ public final class HdfsPlugin extends Plugin implements RepositoryPlugin {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, threadPool));
ClusterService clusterService) {
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, clusterService));
}
}

View File

@ -31,6 +31,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobPath;
@ -40,7 +41,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.UncheckedIOException;
@ -71,8 +71,8 @@ public final class HdfsRepository extends BlobStoreRepository {
final RepositoryMetaData metadata,
final Environment environment,
final NamedXContentRegistry namedXContentRegistry,
final ThreadPool threadPool) {
super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, threadPool);
final ClusterService clusterService) {
super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, clusterService);
this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.repositories.s3;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
@ -35,7 +36,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.function.Function;
@ -172,8 +172,8 @@ class S3Repository extends BlobStoreRepository {
final RepositoryMetaData metadata,
final NamedXContentRegistry namedXContentRegistry,
final S3Service service,
final ThreadPool threadPool) {
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, threadPool);
final ClusterService clusterService) {
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService);
this.service = service;
this.repositoryMetaData = metadata;

View File

@ -22,6 +22,7 @@ package org.elasticsearch.repositories.s3;
import com.amazonaws.util.json.Jackson;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -30,7 +31,6 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.security.AccessController;
@ -79,14 +79,14 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
protected S3Repository createRepository(
final RepositoryMetaData metadata,
final NamedXContentRegistry registry,
final ThreadPool threadPool) {
return new S3Repository(metadata, registry, service, threadPool);
final ClusterService clusterService) {
return new S3Repository(metadata, registry, service, clusterService);
}
@Override
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry,
final ThreadPool threadPool) {
return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, threadPool));
final ClusterService clusterService) {
return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, clusterService));
}
@Override

View File

@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
@ -41,7 +42,6 @@ import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.threadpool.ThreadPool;
import java.security.AccessController;
import java.security.PrivilegedAction;
@ -271,8 +271,8 @@ public class RepositoryCredentialsTests extends ESSingleNodeTestCase {
@Override
protected S3Repository createRepository(RepositoryMetaData metadata,
NamedXContentRegistry registry, ThreadPool threadPool) {
return new S3Repository(metadata, registry, service, threadPool) {
NamedXContentRegistry registry, ClusterService clusterService) {
return new S3Repository(metadata, registry, service, clusterService) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads

View File

@ -23,6 +23,7 @@ import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.s3.S3HttpHandler;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
@ -35,7 +36,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Collection;
@ -109,8 +109,9 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
}
@Override
protected S3Repository createRepository(RepositoryMetaData metadata, NamedXContentRegistry registry, ThreadPool threadPool) {
return new S3Repository(metadata, registry, service, threadPool) {
protected S3Repository createRepository(RepositoryMetaData metadata, NamedXContentRegistry registry,
ClusterService clusterService) {
return new S3Repository(metadata, registry, service, clusterService) {
@Override
public BlobStore blobStore() {

View File

@ -26,8 +26,8 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;
import java.util.Map;
@ -36,7 +36,6 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
public class S3RepositoryTests extends ESTestCase {
@ -120,7 +119,7 @@ public class S3RepositoryTests extends ESTestCase {
}
private S3Repository createS3Repo(RepositoryMetaData metadata) {
return new S3Repository(metadata, NamedXContentRegistry.EMPTY, new DummyS3Service(), mock(ThreadPool.class)) {
return new S3Repository(metadata, NamedXContentRegistry.EMPTY, new DummyS3Service(), BlobStoreTestUtil.mockClusterService()) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.repositories.RepositoryOperation;
import java.io.IOException;
import java.util.ArrayList;
@ -95,7 +96,7 @@ public final class RepositoryCleanupInProgress extends AbstractNamedDiffable<Clu
return Version.V_7_4_0;
}
public static final class Entry implements Writeable {
public static final class Entry implements Writeable, RepositoryOperation {
private final String repository;
@ -111,6 +112,12 @@ public final class RepositoryCleanupInProgress extends AbstractNamedDiffable<Clu
this.repositoryStateId = repositoryStateId;
}
@Override
public long repositoryStateId() {
return repositoryStateId;
}
@Override
public String repository() {
return repository;
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.snapshots.Snapshot;
import java.io.IOException;
@ -164,7 +165,7 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
/**
* A class representing a snapshot deletion request entry in the cluster state.
*/
public static final class Entry implements Writeable {
public static final class Entry implements Writeable, RepositoryOperation {
private final Snapshot snapshot;
private final long startTime;
private final long repositoryStateId;
@ -195,13 +196,6 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
return startTime;
}
/**
* The repository state id at the time the snapshot deletion began.
*/
public long getRepositoryStateId() {
return repositoryStateId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -227,5 +221,15 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
out.writeVLong(startTime);
out.writeLong(repositoryStateId);
}
@Override
public String repository() {
return snapshot.getRepository();
}
@Override
public long repositoryStateId() {
return repositoryStateId;
}
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotsService;
@ -81,7 +82,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return builder.append("]").toString();
}
public static class Entry implements ToXContent {
public static class Entry implements ToXContent, RepositoryOperation {
private final State state;
private final Snapshot snapshot;
private final boolean includeGlobalState;
@ -153,6 +154,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
this(entry, entry.state, shards, entry.failure);
}
@Override
public String repository() {
return snapshot.getRepository();
}
public Snapshot snapshot() {
return this.snapshot;
}
@ -189,7 +195,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return startTime;
}
public long getRepositoryStateId() {
@Override
public long repositoryStateId() {
return repositoryStateId;
}

View File

@ -22,10 +22,10 @@ package org.elasticsearch.plugins;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;
/**
* An extension point for {@link Plugin} implementations to add custom snapshot repositories.
@ -41,7 +41,7 @@ public interface RepositoryPlugin {
* the value is a factory to construct the {@link Repository} interface.
*/
default Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
return Collections.emptyMap();
}
@ -55,7 +55,7 @@ public interface RepositoryPlugin {
* the value is a factory to construct the {@link Repository} interface.
*/
default Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
return Collections.emptyMap();
}
}

View File

@ -20,6 +20,7 @@ package org.elasticsearch.repositories;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
@ -138,6 +139,11 @@ public class FilterRepository implements Repository {
return in.getShardSnapshotStatus(snapshotId, indexId, shardId);
}
@Override
public void updateState(ClusterState state) {
in.updateState(state);
}
@Override
public Lifecycle.State lifecycleState() {
return in.lifecycleState();

View File

@ -43,10 +43,10 @@ public final class RepositoriesModule {
public RepositoriesModule(Environment env, List<RepositoryPlugin> repoPlugins, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool, NamedXContentRegistry namedXContentRegistry) {
Map<String, Repository.Factory> factories = new HashMap<>();
factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, threadPool));
factories.put(FsRepository.TYPE, metadata -> new FsRepository(metadata, env, namedXContentRegistry, clusterService));
for (RepositoryPlugin repoPlugin : repoPlugins) {
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, threadPool);
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getRepositories(env, namedXContentRegistry, clusterService);
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) {
if (factories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Repository type [" + entry.getKey() + "] is already registered");
@ -56,7 +56,7 @@ public final class RepositoriesModule {
Map<String, Repository.Factory> internalFactories = new HashMap<>();
for (RepositoryPlugin repoPlugin : repoPlugins) {
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, threadPool);
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry, clusterService);
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) {
if (internalFactories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered");

View File

@ -287,8 +287,9 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
@Override
public void applyClusterState(ClusterChangedEvent event) {
try {
final ClusterState state = event.state();
RepositoriesMetaData oldMetaData = event.previousState().getMetaData().custom(RepositoriesMetaData.TYPE);
RepositoriesMetaData newMetaData = event.state().getMetaData().custom(RepositoriesMetaData.TYPE);
RepositoriesMetaData newMetaData = state.getMetaData().custom(RepositoriesMetaData.TYPE);
// Check if repositories got changed
if ((oldMetaData == null && newMetaData == null) || (oldMetaData != null && oldMetaData.equals(newMetaData))) {
@ -344,6 +345,9 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
}
}
}
for (Repository repo : builder.values()) {
repo.updateState(state);
}
repositories = Collections.unmodifiableMap(builder);
} catch (Exception ex) {
logger.warn("failure updating cluster state ", ex);
@ -411,11 +415,13 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
throw new RepositoryException(repositoryMetaData.name(),
"repository type [" + repositoryMetaData.type() + "] does not exist");
}
Repository repository = null;
try {
Repository repository = factory.create(repositoryMetaData, factories::get);
repository = factory.create(repositoryMetaData, factories::get);
repository.start();
return repository;
} catch (Exception e) {
IOUtils.closeWhileHandlingException(repository);
logger.warn(new ParameterizedMessage("failed to create repository [{}][{}]",
repositoryMetaData.type(), repositoryMetaData.name()), e);
throw new RepositoryException(repositoryMetaData.name(), "failed to create repository", e);

View File

@ -20,6 +20,7 @@ package org.elasticsearch.repositories;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
@ -238,5 +239,12 @@ public interface Repository extends LifecycleComponent {
*/
IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId);
/**
* Update the repository with the incoming cluster state. This method is invoked from {@link RepositoriesService#applyClusterState} and
* thus the same semantics as with {@link org.elasticsearch.cluster.ClusterStateApplier#applyClusterState} apply for the
* {@link ClusterState} that is passed here.
*
* @param state new cluster state
*/
void updateState(ClusterState state);
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
/**
* Coordinates of an operation that modifies a repository, assuming that repository at a specific generation.
*/
public interface RepositoryOperation {
/**
* Name of the repository affected.
*/
String repository();
/**
* The repository state id at the time the operation began.
*/
long repositoryStateId();
}

View File

@ -35,10 +35,15 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.Strings;
@ -88,6 +93,7 @@ import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.repositories.ShardGenerations;
@ -201,17 +207,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
/**
* Constructs new BlobStoreRepository
* @param metadata The metadata for this repository including name and settings
* @param threadPool Threadpool to run long running repository manipulations on asynchronously
* @param clusterService ClusterService
*/
protected BlobStoreRepository(
final RepositoryMetaData metadata,
final boolean compress,
final NamedXContentRegistry namedXContentRegistry,
final ThreadPool threadPool) {
final ClusterService clusterService) {
this.compress = compress;
this.metadata = metadata;
this.namedXContentRegistry = namedXContentRegistry;
this.threadPool = threadPool;
this.threadPool = clusterService.getClusterApplierService().threadPool();
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
readOnly = metadata.settings().getAsBoolean("readonly", false);
@ -256,6 +262,41 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
// Inspects all cluster state elements that contain a hint about what the current repository generation is and updates
// #latestKnownRepoGen if a newer than currently known generation is found
@Override
public void updateState(ClusterState state) {
if (readOnly) {
// No need to waste cycles, no operations can run against a read-only repository
return;
}
long bestGenerationFromCS = RepositoryData.EMPTY_REPO_GEN;
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null) {
bestGenerationFromCS = bestGeneration(snapshotsInProgress.entries());
}
final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
// Don't use generation from the delete task if we already found a generation for an in progress snapshot.
// In this case, the generation points at the generation the repo will be in after the snapshot finishes so it may not yet exist
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && deletionsInProgress != null) {
bestGenerationFromCS = bestGeneration(deletionsInProgress.getEntries());
}
final RepositoryCleanupInProgress cleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && cleanupInProgress != null) {
bestGenerationFromCS = bestGeneration(cleanupInProgress.entries());
}
final long finalBestGen = bestGenerationFromCS;
latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen));
}
private long bestGeneration(Collection<? extends RepositoryOperation> operations) {
final String repoName = metadata.name();
assert operations.size() <= 1 : "Assumed one or no operations but received " + operations;
return operations.stream().filter(e -> e.repository().equals(repoName)).mapToLong(RepositoryOperation::repositoryStateId)
.max().orElse(RepositoryData.EMPTY_REPO_GEN);
}
public ThreadPool threadPool() {
return threadPool;
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.repositories.fs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
@ -32,7 +33,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
import java.nio.file.Path;
import java.util.function.Function;
@ -75,8 +75,8 @@ public class FsRepository extends BlobStoreRepository {
* Constructs a shared file system repository.
*/
public FsRepository(RepositoryMetaData metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
super(metadata, calculateCompress(metadata, environment), namedXContentRegistry, threadPool);
ClusterService clusterService) {
super(metadata, calculateCompress(metadata, environment), namedXContentRegistry, clusterService);
this.environment = environment;
String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
if (location.isEmpty()) {

View File

@ -587,7 +587,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
ExceptionsHelper.stackTrace(exception),
0,
Collections.emptyList(),
snapshot.getRepositoryStateId(),
snapshot.repositoryStateId(),
snapshot.includeGlobalState(),
metaDataForSnapshot(snapshot, clusterService.state().metaData()),
snapshot.userMetadata(),
@ -796,7 +796,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster";
SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0);
deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId(),
deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.repositoryStateId(),
state.nodes().getMinNodeVersion());
}
}
@ -866,7 +866,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public void onFailure(Exception e) {
logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot());
}
}, updatedSnapshot.getRepositoryStateId(), false);
}, updatedSnapshot.repositoryStateId(), false);
}
assert updatedSnapshot.shards().size() == snapshot.shards().size()
: "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]";
@ -1051,7 +1051,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
failure,
entry.shards().size(),
unmodifiableList(shardFailures),
entry.getRepositoryStateId(),
entry.repositoryStateId(),
entry.includeGlobalState(),
metaDataForSnapshot(entry, metaData),
entry.userMetadata(),
@ -1177,7 +1177,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (matchedInProgress.isPresent()) {
matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());
// Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes
repoGenId = matchedInProgress.get().getRepositoryStateId() + 1L;
repoGenId = matchedInProgress.get().repositoryStateId() + 1L;
}
}
if (matchedEntry.isPresent() == false) {

View File

@ -44,6 +44,7 @@ public class RepositoriesModuleTests extends ESTestCase {
private RepositoryPlugin plugin2;
private Repository.Factory factory;
private ThreadPool threadPool;
private ClusterService clusterService;
@Override
public void setUp() throws Exception {
@ -51,6 +52,7 @@ public class RepositoriesModuleTests extends ESTestCase {
environment = mock(Environment.class);
contentRegistry = mock(NamedXContentRegistry.class);
threadPool = mock(ThreadPool.class);
clusterService = mock(ClusterService.class);
plugin1 = mock(RepositoryPlugin.class);
plugin2 = mock(RepositoryPlugin.class);
factory = mock(Repository.Factory.class);
@ -60,8 +62,8 @@ public class RepositoriesModuleTests extends ESTestCase {
}
public void testCanRegisterTwoRepositoriesWithDifferentTypes() {
when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type2", factory));
when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type2", factory));
// Would throw
new RepositoriesModule(
@ -69,37 +71,37 @@ public class RepositoriesModuleTests extends ESTestCase {
}
public void testCannotRegisterTwoRepositoriesWithSameTypes() {
when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory));
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService,
threadPool, contentRegistry));
assertEquals("Repository type [type1] is already registered", ex.getMessage());
}
public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() {
when(plugin1.getInternalRepositories(environment, contentRegistry, threadPool))
when(plugin1.getInternalRepositories(environment, contentRegistry, clusterService))
.thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool))
when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService))
.thenReturn(Collections.singletonMap("type1", factory));
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService,
threadPool, contentRegistry));
assertEquals("Internal repository type [type1] is already registered", ex.getMessage());
}
public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() {
when(plugin1.getRepositories(environment, contentRegistry, threadPool)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getInternalRepositories(environment, contentRegistry, threadPool))
when(plugin1.getRepositories(environment, contentRegistry, clusterService)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getInternalRepositories(environment, contentRegistry, clusterService))
.thenReturn(Collections.singletonMap("type1", factory));
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
threadPool, contentRegistry));
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), clusterService, threadPool,
contentRegistry));
assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage());
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.repositories;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
@ -219,6 +220,10 @@ public class RepositoriesServiceTests extends ESTestCase {
return null;
}
@Override
public void updateState(final ClusterState state) {
}
@Override
public Lifecycle.State lifecycleState() {
return null;

View File

@ -193,7 +193,8 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
private Repository createRepository() {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool) {
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
BlobStoreTestUtil.mockClusterService()) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -42,7 +43,6 @@ import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.nio.file.Path;
import java.util.Arrays;
@ -72,9 +72,9 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
return Collections.singletonMap(REPO_TYPE,
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry, threadPool) {
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we access blobStore on test/main threads
@ -228,7 +228,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
Environment useCompressEnvironment =
new Environment(useCompressSettings, node().getEnvironment().configFile());
new FsRepository(metaData, useCompressEnvironment, null, null);
new FsRepository(metaData, useCompressEnvironment, null, BlobStoreTestUtil.mockClusterService());
assertWarnings("[repositories.fs.compress] setting was deprecated in Elasticsearch and will be removed in a future release!" +
" See the breaking changes documentation for the next major version.");

View File

@ -54,6 +54,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.DummyShardLock;
@ -90,7 +91,8 @@ public class FsRepositoryTests extends ESTestCase {
int numDocs = indexDocs(directory);
RepositoryMetaData metaData = new RepositoryMetaData("test", "fs", settings);
FsRepository repository = new FsRepository(metaData, new Environment(settings, null), NamedXContentRegistry.EMPTY, threadPool);
FsRepository repository = new FsRepository(metaData, new Environment(settings, null), NamedXContentRegistry.EMPTY,
BlobStoreTestUtil.mockClusterService());
repository.start();
final Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_INDEX_UUID, "myindexUUID").build();
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("myindex", indexSettings);

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRe
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
@ -35,7 +36,6 @@ import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Collection;
@ -188,8 +188,8 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte
public CountingMockRepository(final RepositoryMetaData metadata,
final Environment environment,
final NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
super(metadata, environment, namedXContentRegistry, threadPool);
final NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) {
super(metadata, environment, namedXContentRegistry, clusterService);
}
@Override
@ -209,9 +209,9 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte
public static class CountingMockRepositoryPlugin extends MockRepository.Plugin {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
return Collections.singletonMap("coutingmock",
metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, threadPool));
metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, clusterService));
}
}
}

View File

@ -1247,23 +1247,15 @@ public class SnapshotResiliencyTests extends ESTestCase {
private Repository.Factory getRepoFactory(Environment environment) {
// Run half the tests with the eventually consistent repository
if (blobStoreContext == null) {
return metaData -> {
final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo in the test thread
}
};
repository.start();
return repository;
return metaData -> new FsRepository(metaData, environment, xContentRegistry(), clusterService) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo in the test thread
}
};
} else {
return metaData -> {
final Repository repository = new MockEventuallyConsistentRepository(
metaData, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext, random());
repository.start();
return repository;
};
return metaData ->
new MockEventuallyConsistentRepository(metaData, xContentRegistry(), clusterService, blobStoreContext, random());
}
}
public void restart() {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.snapshots.mockstore;
import org.apache.lucene.codecs.CodecUtil;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
@ -36,7 +37,6 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -74,10 +74,10 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
public MockEventuallyConsistentRepository(
final RepositoryMetaData metadata,
final NamedXContentRegistry namedXContentRegistry,
final ThreadPool threadPool,
final ClusterService clusterService,
final Context context,
final Random random) {
super(metadata, false, namedXContentRegistry, threadPool);
super(metadata, false, namedXContentRegistry, clusterService);
this.context = context;
this.namedXContentRegistry = namedXContentRegistry;
this.random = random;

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.snapshots.mockstore;
import org.apache.lucene.util.SameThreadExecutorService;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
@ -27,10 +26,10 @@ import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -41,8 +40,6 @@ import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
@ -50,7 +47,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
@ -70,7 +67,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
@ -86,7 +83,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) {
repository.start();
final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
@ -104,7 +101,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) {
repository.start();
final BlobContainer container = repository.blobStore().blobContainer(repository.basePath());
final String blobName = randomAlphaOfLength(10);
@ -121,7 +118,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext, random())) {
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) {
repository.start();
final BlobContainer container =
repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0"));
@ -137,13 +134,9 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
public void testOverwriteSnapshotInfoBlob() {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
final ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn(
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10)));
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), threadPool, blobStoreContext, random())) {
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) {
repository.start();
// We create a snap- blob for snapshot "foo" in the first generation

View File

@ -20,6 +20,7 @@ package org.elasticsearch.index.shard;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
@ -146,4 +147,8 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
@Override
public void verify(String verificationToken, DiscoveryNode localNode) {
}
@Override
public void updateState(final ClusterState state) {
}
}

View File

@ -18,9 +18,16 @@
*/
package org.elasticsearch.repositories.blobstore;
import org.apache.lucene.util.SameThreadExecutorService;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
@ -52,9 +59,11 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
@ -66,6 +75,11 @@ import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public final class BlobStoreTestUtil {
@ -275,4 +289,40 @@ public final class BlobStoreTestUtil {
}
}
}
/**
* Creates a mocked {@link ClusterService} for use in {@link BlobStoreRepository} related tests that mocks out all the necessary
* functionality to make {@link BlobStoreRepository} work.
*
* @return Mock ClusterService
*/
public static ClusterService mockClusterService() {
final ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
when(threadPool.generic()).thenReturn(new SameThreadExecutorService());
when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn(
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10)));
final ClusterService clusterService = mock(ClusterService.class);
final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class);
when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
final AtomicReference<ClusterState> currentState = new AtomicReference<>(ClusterState.EMPTY_STATE);
when(clusterService.state()).then(invocationOnMock -> currentState.get());
final List<ClusterStateApplier> appliers = new CopyOnWriteArrayList<>();
doAnswer(invocation -> {
final ClusterStateUpdateTask task = ((ClusterStateUpdateTask) invocation.getArguments()[1]);
final ClusterState current = currentState.get();
final ClusterState next = task.execute(current);
currentState.set(next);
appliers.forEach(applier -> applier.applyClusterState(
new ClusterChangedEvent((String) invocation.getArguments()[0], next, current)));
task.clusterStateProcessed((String) invocation.getArguments()[0], current, next);
return null;
}).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class));
doAnswer(invocation -> {
appliers.add((ClusterStateApplier) invocation.getArguments()[0]);
return null;
}).when(clusterService).addStateApplier(any(ClusterStateApplier.class));
when(clusterApplierService.threadPool()).thenReturn(threadPool);
return clusterService;
}
}

View File

@ -25,6 +25,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
@ -40,7 +41,6 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.InputStream;
@ -71,8 +71,9 @@ public class MockRepository extends FsRepository {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
return Collections.singletonMap("mock", (metadata) -> new MockRepository(metadata, env, namedXContentRegistry, threadPool));
ClusterService clusterService) {
return Collections.singletonMap("mock", (metadata) ->
new MockRepository(metadata, env, namedXContentRegistry, clusterService));
}
@Override
@ -113,8 +114,8 @@ public class MockRepository extends FsRepository {
private volatile boolean blocked = false;
public MockRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
super(overrideSettings(metadata, environment), environment, namedXContentRegistry, threadPool);
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) {
super(overrideSettings(metadata, environment), environment, namedXContentRegistry, clusterService);
randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0);
randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false);

View File

@ -337,9 +337,10 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
@Override
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
Repository.Factory repositoryFactory =
(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool);
(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(),
clusterService.getClusterApplierService().threadPool());
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -428,6 +429,10 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}
@Override
public void updateState(ClusterState state) {
}
private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion,
Client followerClient, Index followerIndex) {
final PlainActionFuture<IndexMetaData> indexMetadataFuture = new PlainActionFuture<>();

View File

@ -356,7 +356,7 @@ public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin,
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory());
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -36,7 +37,6 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;
import java.io.IOException;
@ -71,7 +71,7 @@ public class SourceOnlySnapshotIT extends ESIntegTestCase {
public static final class MyPlugin extends Plugin implements RepositoryPlugin, EnginePlugin {
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
return Collections.singletonMap("source", SourceOnlySnapshotRepository.newRepositoryFactory());
}
@Override

View File

@ -61,6 +61,7 @@ import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.threadpool.ThreadPool;
@ -349,10 +350,10 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
}
/** Create a {@link Repository} with a random name **/
private Repository createRepository() throws IOException {
private Repository createRepository() {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), threadPool);
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), BlobStoreTestUtil.mockClusterService());
}
private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {

View File

@ -417,19 +417,21 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
HashMap<String, Repository.Factory> repositories = new HashMap<>(super.getRepositories(env, namedXContentRegistry, threadPool));
filterPlugins(RepositoryPlugin.class).forEach(r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, threadPool)));
ClusterService clusterService) {
HashMap<String, Repository.Factory> repositories =
new HashMap<>(super.getRepositories(env, namedXContentRegistry, clusterService));
filterPlugins(RepositoryPlugin.class).forEach(
r -> repositories.putAll(r.getRepositories(env, namedXContentRegistry, clusterService)));
return repositories;
}
@Override
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
HashMap<String, Repository.Factory> internalRepositories =
new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, threadPool));
new HashMap<>(super.getInternalRepositories(env, namedXContentRegistry, clusterService));
filterPlugins(RepositoryPlugin.class).forEach(r ->
internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, threadPool)));
internalRepositories.putAll(r.getInternalRepositories(env, namedXContentRegistry, clusterService)));
return internalRepositories;
}