Add the possibility to inject a custom RecoveryState factory to IndexStorePlugin implementations (#59124)

Add a custom factory for recovery state into IndexStorePlugin that
allows different implementors to provide its own RecoveryState
implementation.

Backport of #59038
This commit is contained in:
Francisco Fernández Castaño 2020-07-15 11:11:07 +02:00 committed by GitHub
parent bc11503dc3
commit 66ef1cdad7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 203 additions and 22 deletions

View File

@ -162,6 +162,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING,
IndexModule.INDEX_STORE_TYPE_SETTING,
IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
IndexModule.INDEX_RECOVERY_TYPE_SETTING,
IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING,
FsDirectoryFactory.INDEX_LOCK_FACTOR_SETTING,
Store.FORCE_RAM_TERM_DICT,

View File

@ -59,6 +59,7 @@ import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
@ -100,9 +101,14 @@ public final class IndexModule {
private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory();
private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new;
public static final Setting<String> INDEX_STORE_TYPE_SETTING =
new Setting<>("index.store.type", "", Function.identity(), Property.IndexScope, Property.NodeScope);
public static final Setting<String> INDEX_RECOVERY_TYPE_SETTING =
new Setting<>("index.recovery.type", "", Function.identity(), Property.IndexScope, Property.NodeScope);
/** On which extensions to load data into the file-system cache upon opening of files.
* This only works with the mmap directory, and even in that case is still
* best-effort only. */
@ -134,6 +140,7 @@ public final class IndexModule {
private final IndexNameExpressionResolver expressionResolver;
private final AtomicBoolean frozen = new AtomicBoolean(false);
private final BooleanSupplier allowExpensiveQueries;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
@ -150,7 +157,8 @@ public final class IndexModule {
final EngineFactory engineFactory,
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver) {
final IndexNameExpressionResolver expressionResolver,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
this.engineFactory = Objects.requireNonNull(engineFactory);
@ -159,6 +167,7 @@ public final class IndexModule {
this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
this.allowExpensiveQueries = allowExpensiveQueries;
this.expressionResolver = expressionResolver;
this.recoveryStateFactories = recoveryStateFactories;
}
/**
@ -410,6 +419,7 @@ public final class IndexModule {
indexReaderWrapper.get() == null ? (shard) -> null : indexReaderWrapper.get();
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
QueryCache queryCache = null;
IndexAnalyzers indexAnalyzers = null;
boolean success = false;
@ -432,7 +442,7 @@ public final class IndexModule {
engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache,
directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners,
indexOperationListeners, namedWriteableRegistry, idFieldDataEnabled, allowExpensiveQueries, expressionResolver,
valuesSourceRegistry);
valuesSourceRegistry, recoveryStateFactory);
success = true;
return indexService;
} finally {
@ -471,6 +481,22 @@ public final class IndexModule {
return factory;
}
private static IndexStorePlugin.RecoveryStateFactory getRecoveryStateFactory(
final IndexSettings indexSettings, final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories) {
final String recoveryType = indexSettings.getValue(INDEX_RECOVERY_TYPE_SETTING);
if (recoveryType.isEmpty()) {
return DEFAULT_RECOVERY_STATE_FACTORY;
}
IndexStorePlugin.RecoveryStateFactory factory = recoveryStateFactories.get(recoveryType);
if (factory == null) {
throw new IllegalArgumentException("Unknown recovery type [" + recoveryType + "]");
}
return factory;
}
/**
* creates a new mapper service to do administrative work like mapping updates. This *should not* be used for document parsing.
* doing so will result in an exception.

View File

@ -32,6 +32,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedFunction;
@ -79,6 +80,7 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
@ -115,6 +117,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
private final IndexCache indexCache;
private final MapperService mapperService;
@ -175,7 +178,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
BooleanSupplier idFieldDataEnabled,
BooleanSupplier allowExpensiveQueries,
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry) {
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
this.indexSettings = indexSettings;
@ -224,6 +228,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
this.eventListener = eventListener;
this.nodeEnv = nodeEnv;
this.directoryFactory = directoryFactory;
this.recoveryStateFactory = recoveryStateFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.readerWrapper = wrapperFactory.apply(this);
@ -564,6 +569,10 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
}
public RecoveryState createRecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, DiscoveryNode sourceNode) {
return recoveryStateFactory.newRecoveryState(shardRouting, targetNode, sourceNode);
}
@Override
public IndexSettings getIndexSettings() {
return indexSettings;

View File

@ -222,6 +222,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final MetaStateService metaStateService;
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
final AbstractRefCounted indicesRefCount; // pkg-private for testing
private final CountDownLatch closeLatch = new CountDownLatch(1);
private volatile boolean idFieldDataEnabled;
@ -246,7 +247,8 @@ public class IndicesService extends AbstractLifecycleComponent
IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
ScriptService scriptService, ClusterService clusterService, Client client, MetaStateService metaStateService,
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories, ValuesSourceRegistry valuesSourceRegistry) {
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories, ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories) {
this.settings = settings;
this.threadPool = threadPool;
this.pluginsService = pluginsService;
@ -292,6 +294,7 @@ public class IndicesService extends AbstractLifecycleComponent
}
this.directoryFactories = directoryFactories;
this.recoveryStateFactories = recoveryStateFactories;
// doClose() is called when shutting down a node, yet there might still be ongoing requests
// that we need to wait for before closing some resources such as the caches. In order to
// avoid closing these resources while ongoing requests are still being processed, we use a
@ -642,7 +645,7 @@ public class IndicesService extends AbstractLifecycleComponent
indexCreationContext);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings),
directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver);
directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver, recoveryStateFactories);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
}
@ -713,7 +716,7 @@ public class IndicesService extends AbstractLifecycleComponent
public synchronized MapperService createIndexMapperService(IndexMetadata indexMetadata) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetadata, this.settings, indexScopedSettings);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings),
directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver);
directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver, recoveryStateFactories);
pluginsService.onIndexModule(indexModule);
return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService);
}
@ -748,16 +751,19 @@ public class IndicesService extends AbstractLifecycleComponent
@Override
public IndexShard createShard(
final ShardRouting shardRouting,
final RecoveryState recoveryState,
final PeerRecoveryTargetService recoveryTargetService,
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,
final Consumer<IndexShard.ShardFailure> onShardFailure,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException {
final RetentionLeaseSyncer retentionLeaseSyncer,
final DiscoveryNode targetNode,
final DiscoveryNode sourceNode) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
ensureChangesAllowed();
IndexService indexService = indexService(shardRouting.index());
assert indexService != null;
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService,

View File

@ -590,16 +590,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
try {
final long primaryTerm = state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id());
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
indicesService.createShard(
shardRouting,
recoveryState,
recoveryTargetService,
new RecoveryListener(shardRouting, primaryTerm),
repositoriesService,
failedShardHandler,
globalCheckpointSyncer,
retentionLeaseSyncer);
retentionLeaseSyncer,
nodes.getLocalNode(),
sourceNode);
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);
}
@ -889,25 +889,27 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
* Creates a shard for the specified shard routing and starts recovery.
*
* @param shardRouting the shard routing
* @param recoveryState the recovery state
* @param recoveryTargetService recovery service for the target
* @param recoveryListener a callback when recovery changes state (finishes or fails)
* @param repositoriesService service responsible for snapshot/restore
* @param onShardFailure a callback when this shard fails
* @param globalCheckpointSyncer a callback when this shard syncs the global checkpoint
* @param retentionLeaseSyncer a callback when this shard syncs retention leases
* @param targetNode the node where this shard will be recovered
* @param sourceNode the source node to recover this shard from (it might be null)
* @return a new shard
* @throws IOException if an I/O exception occurs when creating the shard
*/
T createShard(
ShardRouting shardRouting,
RecoveryState recoveryState,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
RepositoriesService repositoriesService,
Consumer<IndexShard.ShardFailure> onShardFailure,
Consumer<ShardId> globalCheckpointSyncer,
RetentionLeaseSyncer retentionLeaseSyncer) throws IOException;
RetentionLeaseSyncer retentionLeaseSyncer,
DiscoveryNode targetNode,
@Nullable DiscoveryNode sourceNode) throws IOException;
/**
* Returns shard for the specified id if it exists otherwise returns <code>null</code>.

View File

@ -470,6 +470,13 @@ public class Node implements Closeable {
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories =
pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getRecoveryStateFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final Map<String, Collection<SystemIndexDescriptor>> systemIndexDescriptorMap = Collections.unmodifiableMap(pluginsService
.filterPlugins(SystemIndexPlugin.class)
.stream()
@ -487,7 +494,7 @@ public class Node implements Closeable {
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptService,
clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories,
searchModule.getValuesSourceRegistry());
searchModule.getValuesSourceRegistry(), recoveryStateFactories);
final AliasValidator aliasValidator = new AliasValidator();

View File

@ -20,10 +20,15 @@
package org.elasticsearch.plugins;
import org.apache.lucene.store.Directory;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.recovery.RecoveryState;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
/**
@ -55,4 +60,26 @@ public interface IndexStorePlugin {
*/
Map<String, DirectoryFactory> getDirectoryFactories();
/**
* An interface that allows to create a new {@link RecoveryState} per shard.
*/
@FunctionalInterface
interface RecoveryStateFactory {
/**
* Creates a new {@link RecoveryState} per shard. This method is called once per shard on shard creation.
* @return a new RecoveryState instance
*/
RecoveryState newRecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode);
}
/**
* The {@link RecoveryStateFactory} mappings for this plugin. When an index is created the recovery type setting
* {@link org.elasticsearch.index.IndexModule#INDEX_RECOVERY_TYPE_SETTING} on the index will be examined and either use the default
* or looked up among all the recovery state factories from {@link IndexStorePlugin} plugins.
*
* @return a map from recovery type to an recovery state factory
*/
default Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
return Collections.emptyMap();
}
}

View File

@ -35,6 +35,10 @@ import org.apache.lucene.util.SetOnce.AlreadySetException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.breaker.CircuitBreaker;
@ -77,6 +81,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.internal.SearchContext;
@ -104,6 +109,8 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
public class IndexModuleTests extends ESTestCase {
private Index index;
@ -168,7 +175,13 @@ public class IndexModuleTests extends ESTestCase {
public void testWrapperIsBound() throws IOException {
final MockEngineFactory engineFactory = new MockEngineFactory(AssertingDirectoryReader.class);
IndexModule module = new IndexModule(
indexSettings, emptyAnalysisRegistry, engineFactory, Collections.emptyMap(), () -> true, new IndexNameExpressionResolver());
indexSettings,
emptyAnalysisRegistry,
engineFactory,
Collections.emptyMap(),
() -> true,
new IndexNameExpressionResolver(),
Collections.emptyMap());
module.setReaderWrapper(s -> new Wrapper());
IndexService indexService = newIndexService(module);
@ -189,7 +202,7 @@ public class IndexModuleTests extends ESTestCase {
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = singletonMap(
"foo_store", new FooFunction());
final IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), indexStoreFactories,
() -> true, new IndexNameExpressionResolver());
() -> true, new IndexNameExpressionResolver(), Collections.emptyMap());
final IndexService indexService = newIndexService(module);
assertThat(indexService.getDirectoryFactory(), instanceOf(FooFunction.class));
@ -485,9 +498,47 @@ public class IndexModuleTests extends ESTestCase {
assertThat(e, hasToString(containsString("store type [" + storeType + "] is not allowed")));
}
public void testRegisterCustomRecoveryStateFactory() throws IOException {
final Settings settings = Settings
.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(IndexModule.INDEX_RECOVERY_TYPE_SETTING.getKey(), "test_recovery")
.build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
RecoveryState recoveryState = mock(RecoveryState.class);
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories = singletonMap(
"test_recovery", (shardRouting, targetNode, sourceNode) -> recoveryState);
final IndexModule module = new IndexModule(indexSettings,
emptyAnalysisRegistry,
new InternalEngineFactory(),
Collections.emptyMap(),
() -> true,
new IndexNameExpressionResolver(),
recoveryStateFactories);
final IndexService indexService = newIndexService(module);
ShardRouting shard = createInitializedShardRouting();
assertThat(indexService.createRecoveryState(shard, mock(DiscoveryNode.class), mock(DiscoveryNode.class)), is(recoveryState));
indexService.close("closing", false);
}
private ShardRouting createInitializedShardRouting() {
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true,
RecoverySource.ExistingStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard = shard.initialize("node1", null, -1);
return shard;
}
private static IndexModule createIndexModule(IndexSettings indexSettings, AnalysisRegistry emptyAnalysisRegistry) {
return new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap(), () -> true,
new IndexNameExpressionResolver());
new IndexNameExpressionResolver(), Collections.emptyMap());
}
class CustomQueryCache implements QueryCache {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.indices.cluster;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -230,14 +231,16 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
@Override
public MockIndexShard createShard(
final ShardRouting shardRouting,
final RecoveryState recoveryState,
final PeerRecoveryTargetService recoveryTargetService,
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
final RepositoriesService repositoriesService,
final Consumer<IndexShard.ShardFailure> onShardFailure,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException {
final RetentionLeaseSyncer retentionLeaseSyncer,
final DiscoveryNode targetNode,
final DiscoveryNode sourceNode) throws IOException {
failRandomly();
RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode);
MockIndexService indexService = indexService(recoveryState.getShardId().getIndex());
MockIndexShard indexShard = indexService.createShard(shardRouting);
indexShard.recoveryState = recoveryState;

View File

@ -20,9 +20,12 @@
package org.elasticsearch.plugins;
import org.elasticsearch.bootstrap.JavaVersion;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.test.ESTestCase;
@ -69,6 +72,38 @@ public class IndexStorePluginTests extends ESTestCase {
}
public static class FooCustomRecoveryStore extends Plugin implements IndexStorePlugin {
@Override
public Map<String, DirectoryFactory> getDirectoryFactories() {
return Collections.singletonMap("store-a", new FsDirectoryFactory());
}
@Override
public Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
return Collections.singletonMap("recovery-type", new RecoveryFactory());
}
}
public static class BarCustomRecoveryStore extends Plugin implements IndexStorePlugin {
@Override
public Map<String, DirectoryFactory> getDirectoryFactories() {
return Collections.singletonMap("store-b", new FsDirectoryFactory());
}
@Override
public Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
return Collections.singletonMap("recovery-type", new RecoveryFactory());
}
}
public static class RecoveryFactory implements IndexStorePlugin.RecoveryStateFactory {
@Override
public RecoveryState newRecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, DiscoveryNode sourceNode) {
return new RecoveryState(shardRouting, targetNode, sourceNode);
}
}
public void testIndexStoreFactoryConflictsWithBuiltInIndexStoreType() {
final Settings settings = Settings.builder().put("path.home", createTempDir()).build();
final IllegalStateException e = expectThrows(
@ -92,4 +127,17 @@ public class IndexStorePluginTests extends ESTestCase {
}
}
public void testDuplicateIndexStoreRecoveryStateFactories() {
final Settings settings = Settings.builder().put("path.home", createTempDir()).build();
final IllegalStateException e = expectThrows(
IllegalStateException.class, () -> new MockNode(settings, Arrays.asList(FooCustomRecoveryStore.class,
BarCustomRecoveryStore.class)));
if (JavaVersion.current().compareTo(JavaVersion.parse("9")) >= 0) {
assertThat(e.getMessage(), containsString("Duplicate key recovery-type"));
} else {
assertThat(e, hasToString(matches(
"java.lang.IllegalStateException: Duplicate key " +
"org.elasticsearch.plugins.IndexStorePluginTests\\$RecoveryFactory@[\\w\\d]+")));
}
}
}

View File

@ -1513,7 +1513,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
new MetaStateService(nodeEnv, namedXContentRegistry),
Collections.emptyList(),
emptyMap(),
null
null,
emptyMap()
);
final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings);
snapshotShardsService =

View File

@ -79,7 +79,7 @@ public class WatcherPluginTests extends ESTestCase {
AnalysisRegistry registry = new AnalysisRegistry(TestEnvironment.newEnvironment(settings), emptyMap(), emptyMap(), emptyMap(),
emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap());
IndexModule indexModule = new IndexModule(indexSettings, registry, new InternalEngineFactory(), Collections.emptyMap(),
() -> true, new IndexNameExpressionResolver());
() -> true, new IndexNameExpressionResolver(), Collections.emptyMap());
// this will trip an assertion if the watcher indexing operation listener is null (which it is) but we try to add it
watcher.onIndexModule(indexModule);