Enable engine factory to be pluggable (#31183)

This commit enables the engine factory to be pluggable based on index
settings used when creating the index service for an index.
This commit is contained in:
Jason Tedor 2018-06-07 17:01:06 -04:00 committed by GitHub
parent d8c0a39c15
commit e481b860a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 350 additions and 76 deletions

View File

@ -19,8 +19,8 @@
package org.elasticsearch.index;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.search.similarities.BM25Similarity;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
@ -60,6 +60,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
@ -104,8 +105,7 @@ public final class IndexModule {
private final IndexSettings indexSettings;
private final AnalysisRegistry analysisRegistry;
// pkg private so tests can mock
final SetOnce<EngineFactory> engineFactory = new SetOnce<>();
private final EngineFactory engineFactory;
private SetOnce<IndexSearcherWrapperFactory> indexSearcherWrapper = new SetOnce<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
private final Map<String, TriFunction<Settings, Version, ScriptService, Similarity>> similarities = new HashMap<>();
@ -115,9 +115,18 @@ public final class IndexModule {
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
private final AtomicBoolean frozen = new AtomicBoolean(false);
public IndexModule(IndexSettings indexSettings, AnalysisRegistry analysisRegistry) {
/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
* via {@link org.elasticsearch.plugins.PluginsService#onIndexModule(IndexModule)}.
*
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
*/
public IndexModule(final IndexSettings indexSettings, final AnalysisRegistry analysisRegistry, final EngineFactory engineFactory) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
}
@ -158,6 +167,15 @@ public final class IndexModule {
return indexSettings.getIndex();
}
/**
* The engine factory provided during construction of this index module.
*
* @return the engine factory
*/
EngineFactory getEngineFactory() {
return engineFactory;
}
/**
* Adds an {@link IndexEventListener} for this index. All listeners added here
* are maintained for the entire index lifecycle on this node. Once an index is closed or deleted these
@ -364,7 +382,7 @@ public final class IndexModule {
}
return new IndexService(indexSettings, environment, xContentRegistry,
new SimilarityService(indexSettings, scriptService, similarities),
shardStoreDeleter, analysisRegistry, engineFactory.get(), circuitBreakerService, bigArrays, threadPool, scriptService,
shardStoreDeleter, analysisRegistry, engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService,
client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry,
indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry);
}

View File

@ -139,7 +139,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
SimilarityService similarityService,
ShardStoreDeleter shardStoreDeleter,
AnalysisRegistry registry,
@Nullable EngineFactory engineFactory,
EngineFactory engineFactory,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
@ -188,7 +188,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
this.warmer = new IndexWarmer(indexSettings.getSettings(), threadPool, indexFieldData,
bitsetFilterCache.createListener(threadPool));
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
this.engineFactory = engineFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
@ -681,9 +681,9 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
void addPendingDelete(ShardId shardId, IndexSettings indexSettings);
}
final EngineFactory getEngineFactory() {
public final EngineFactory getEngineFactory() {
return engineFactory;
} // pkg private for testing
}
final IndexSearcherWrapper getSearcherWrapper() {
return searcherWrapper;

View File

@ -82,7 +82,6 @@ import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.engine.SegmentsStats;
@ -194,7 +193,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
protected volatile IndexShardState state;
protected volatile long primaryTerm;
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
protected final EngineFactory engineFactory;
final EngineFactory engineFactory;
private final IndexingOperationListener indexingOperationListeners;
private final Runnable globalCheckpointSyncer;
@ -267,7 +266,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
this.warmer = warmer;
this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard");
this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.store = store;
this.indexSortSupplier = indexSortSupplier;
this.indexEventListener = indexEventListener;

View File

@ -27,6 +27,8 @@ import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.common.geo.ShapesAvailability;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.mapper.BinaryFieldMapper;
import org.elasticsearch.index.mapper.BooleanFieldMapper;
import org.elasticsearch.index.mapper.CompletionFieldMapper;
@ -60,10 +62,12 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.plugins.MapperPlugin;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
@ -234,4 +238,9 @@ public class IndicesModule extends AbstractModule {
public MapperRegistry getMapperRegistry() {
return mapperRegistry;
}
public Collection<Function<IndexSettings, Optional<EngineFactory>>> getEngineFactories() {
return Collections.emptyList();
}
}

View File

@ -25,7 +25,6 @@ import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceAlreadyExistsException;
@ -45,6 +44,7 @@ import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -67,6 +67,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
@ -79,6 +80,8 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
@ -116,10 +119,14 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -172,6 +179,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final IndicesRequestCache indicesRequestCache;
private final IndicesQueryCache indicesQueryCache;
private final MetaStateService metaStateService;
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
@Override
protected void doStart() {
@ -183,7 +191,8 @@ public class IndicesService extends AbstractLifecycleComponent
AnalysisRegistry analysisRegistry, IndexNameExpressionResolver indexNameExpressionResolver,
MapperRegistry mapperRegistry, NamedWriteableRegistry namedWriteableRegistry, ThreadPool threadPool,
IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
ScriptService scriptService, Client client, MetaStateService metaStateService) {
ScriptService scriptService, Client client, MetaStateService metaStateService,
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders) {
super(settings);
this.threadPool = threadPool;
this.pluginsService = pluginsService;
@ -214,6 +223,7 @@ public class IndicesService extends AbstractLifecycleComponent
this.cleanInterval = INDICES_CACHE_CLEAN_INTERVAL_SETTING.get(settings);
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval);
this.metaStateService = metaStateService;
this.engineFactoryProviders = engineFactoryProviders;
}
@Override
@ -442,7 +452,7 @@ public class IndicesService extends AbstractLifecycleComponent
idxSettings.getNumberOfReplicas(),
reason);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
}
@ -466,6 +476,34 @@ public class IndicesService extends AbstractLifecycleComponent
);
}
private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
final List<Optional<EngineFactory>> engineFactories =
engineFactoryProviders
.stream()
.map(engineFactoryProvider -> engineFactoryProvider.apply(idxSettings))
.filter(maybe -> Objects.requireNonNull(maybe).isPresent())
.collect(Collectors.toList());
if (engineFactories.isEmpty()) {
return new InternalEngineFactory();
} else if (engineFactories.size() == 1) {
assert engineFactories.get(0).isPresent();
return engineFactories.get(0).get();
} else {
final String message = String.format(
Locale.ROOT,
"multiple engine factories provided for %s: %s",
idxSettings.getIndex(),
engineFactories
.stream()
.map(t -> {
assert t.isPresent();
return "[" + t.get().getClass().getName() + "]";
})
.collect(Collectors.joining(",")));
throw new IllegalStateException(message);
}
}
/**
* creates a new mapper service for the given index, in order to do administrative work like mapping updates.
* This *should not* be used for document parsing. Doing so will result in an exception.
@ -474,7 +512,7 @@ public class IndicesService extends AbstractLifecycleComponent
*/
public synchronized MapperService createIndexMapperService(IndexMetaData indexMetaData) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopedSettings);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
pluginsService.onIndexModule(indexModule);
return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService);
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.node;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Constants;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
@ -58,12 +57,10 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Binder;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Key;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.DeprecationLogger;
@ -82,6 +79,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
@ -93,8 +91,10 @@ import org.elasticsearch.gateway.GatewayModule;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
@ -109,10 +109,15 @@ import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.MetaDataUpgrader;
@ -140,10 +145,6 @@ import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
import org.elasticsearch.persistent.PersistentTasksService;
import java.io.BufferedWriter;
import java.io.Closeable;
@ -161,6 +162,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -395,11 +397,21 @@ public class Node implements Closeable {
.flatMap(Function.identity()).collect(toList()));
modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),
client, metaStateService);
// collect engine factory providers from server and from plugins
final Collection<EnginePlugin> enginePlugins = pluginsService.filterPlugins(EnginePlugin.class);
final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders =
Stream.concat(
indicesModule.getEngineFactories().stream(),
enginePlugins.stream().map(plugin -> plugin::getEngineFactory))
.collect(Collectors.toList());
final IndicesService indicesService =
new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays,
scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders);
Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugins;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import java.util.Optional;
/**
* A plugin that provides alternative engine implementations.
*/
public interface EnginePlugin {
/**
* When an index is created this method is invoked for each engine plugin. Engine plugins can inspect the index settings to determine
* whether or not to provide an engine factory for the given index. A plugin that is not overriding the default engine should return
* {@link Optional#empty()}. If multiple plugins return an engine factory for a given index the index will not be created and an
* {@link IllegalStateException} will be thrown during index creation.
*
* @return an optional engine factory
*/
Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings);
}

View File

@ -30,7 +30,6 @@ import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.similarities.BM25Similarity;
import org.apache.lucene.search.similarities.Similarity;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.SetOnce.AlreadySetException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -40,6 +39,7 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
@ -50,6 +50,7 @@ import org.elasticsearch.index.cache.query.IndexQueryCache;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.ParsedDocument;
@ -146,13 +147,12 @@ public class IndexModuleTests extends ESTestCase {
}
public void testWrapperIsBound() throws IOException {
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new MockEngineFactory(AssertingDirectoryReader.class));
module.setSearcherWrapper((s) -> new Wrapper());
module.engineFactory.set(new MockEngineFactory(AssertingDirectoryReader.class));
IndexService indexService = newIndexService(module);
assertTrue(indexService.getSearcherWrapper() instanceof Wrapper);
assertSame(indexService.getEngineFactory(), module.engineFactory.get());
assertSame(indexService.getEngineFactory(), module.getEngineFactory());
indexService.close("simon says", false);
}
@ -165,7 +165,7 @@ public class IndexModuleTests extends ESTestCase {
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "foo_store")
.build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory());
module.addIndexStore("foo_store", FooStore::new);
try {
module.addIndexStore("foo_store", FooStore::new);
@ -189,7 +189,7 @@ public class IndexModuleTests extends ESTestCase {
}
};
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory());
module.addIndexEventListener(eventListener);
IndexService indexService = newIndexService(module);
IndexSettings x = indexService.getIndexSettings();
@ -203,7 +203,8 @@ public class IndexModuleTests extends ESTestCase {
public void testListener() throws IOException {
Setting<Boolean> booleanSetting = Setting.boolSetting("index.foo.bar", false, Property.Dynamic, Property.IndexScope);
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings, booleanSetting), emptyAnalysisRegistry);
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings, booleanSetting);
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory());
Setting<Boolean> booleanSetting2 = Setting.boolSetting("index.foo.bar.baz", false, Property.Dynamic, Property.IndexScope);
AtomicBoolean atomicBoolean = new AtomicBoolean(false);
module.addSettingsUpdateConsumer(booleanSetting, atomicBoolean::set);
@ -222,7 +223,8 @@ public class IndexModuleTests extends ESTestCase {
}
public void testAddIndexOperationListener() throws IOException {
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry);
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry, new InternalEngineFactory());
AtomicBoolean executed = new AtomicBoolean(false);
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
@ -252,7 +254,8 @@ public class IndexModuleTests extends ESTestCase {
}
public void testAddSearchOperationListener() throws IOException {
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry);
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry, new InternalEngineFactory());
AtomicBoolean executed = new AtomicBoolean(false);
SearchOperationListener listener = new SearchOperationListener() {
@ -279,13 +282,14 @@ public class IndexModuleTests extends ESTestCase {
}
public void testAddSimilarity() throws IOException {
Settings indexSettings = Settings.builder()
Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put("index.similarity.my_similarity.type", "test_similarity")
.put("index.similarity.my_similarity.key", "there is a key")
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), emptyAnalysisRegistry);
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory());
module.addSimilarity("test_similarity",
(providerSettings, indexCreatedVersion, scriptService) -> new TestSimilarity(providerSettings.get("key")));
@ -299,7 +303,8 @@ public class IndexModuleTests extends ESTestCase {
}
public void testFrozen() {
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry);
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), emptyAnalysisRegistry, new InternalEngineFactory());
module.freeze();
String msg = "Can't modify IndexModule once the index service has been created";
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addSearchOperationListener(null)).getMessage());
@ -312,32 +317,35 @@ public class IndexModuleTests extends ESTestCase {
}
public void testSetupUnknownSimilarity() throws IOException {
Settings indexSettings = Settings.builder()
Settings settings = Settings.builder()
.put("index.similarity.my_similarity.type", "test_similarity")
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), emptyAnalysisRegistry);
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory());
Exception ex = expectThrows(IllegalArgumentException.class, () -> newIndexService(module));
assertEquals("Unknown Similarity type [test_similarity] for [my_similarity]", ex.getMessage());
}
public void testSetupWithoutType() throws IOException {
Settings indexSettings = Settings.builder()
Settings settings = Settings.builder()
.put("index.similarity.my_similarity.foo", "bar")
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), emptyAnalysisRegistry);
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory());
Exception ex = expectThrows(IllegalArgumentException.class, () -> newIndexService(module));
assertEquals("Similarity [my_similarity] must have an associated type", ex.getMessage());
}
public void testForceCustomQueryCache() throws IOException {
Settings indexSettings = Settings.builder()
Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), emptyAnalysisRegistry);
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory());
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
expectThrows(AlreadySetException.class, () -> module.forceQueryCacheProvider((a, b) -> new CustomQueryCache()));
IndexService indexService = newIndexService(module);
@ -346,21 +354,23 @@ public class IndexModuleTests extends ESTestCase {
}
public void testDefaultQueryCacheImplIsSelected() throws IOException {
Settings indexSettings = Settings.builder()
Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), emptyAnalysisRegistry);
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory());
IndexService indexService = newIndexService(module);
assertTrue(indexService.cache().query() instanceof IndexQueryCache);
indexService.close("simon says", false);
}
public void testDisableQueryCacheHasPrecedenceOverForceQueryCache() throws IOException {
Settings indexSettings = Settings.builder()
Settings settings = Settings.builder()
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings("foo", indexSettings), emptyAnalysisRegistry);
IndexModule module =
new IndexModule(IndexSettingsModule.newIndexSettings("foo", settings), emptyAnalysisRegistry, new InternalEngineFactory());
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
IndexService indexService = newIndexService(module);
assertTrue(indexService.cache().query() instanceof DisabledQueryCache);

View File

@ -60,6 +60,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
@ -160,7 +161,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
protected EngineFactory getEngineFactory(ShardRouting routing) {
return null;
return new InternalEngineFactory();
}
public int indexDocs(final int numOfDoc) throws Exception {

View File

@ -38,6 +38,7 @@ import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
@ -406,7 +407,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
if (routing.primary()) {
return primaryEngineFactory;
} else {
return null;
return new InternalEngineFactory();
}
}
}) {
@ -500,7 +501,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
@Override
protected EngineFactory getEngineFactory(final ShardRouting routing) {
if (routing.primary()) {
return null;
return new InternalEngineFactory();
} else {
return replicaEngineFactory;
}

View File

@ -76,6 +76,7 @@ import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
@ -838,7 +839,8 @@ public class IndexShardTests extends IndexShardTestCase {
.build();
final IndexMetaData.Builder indexMetadata = IndexMetaData.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
final AtomicBoolean synced = new AtomicBoolean();
final IndexShard primaryShard = newShard(shardRouting, indexMetadata.build(), null, null, () -> { synced.set(true); });
final IndexShard primaryShard =
newShard(shardRouting, indexMetadata.build(), null, new InternalEngineFactory(), () -> synced.set(true));
// add a replica
recoverShardFromStore(primaryShard);
final IndexShard replicaShard = newShard(shardId, false);
@ -1885,8 +1887,13 @@ public class IndexShardTests extends IndexShardTestCase {
};
closeShards(shard);
IndexShard newShard = newShard(
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {}, EMPTY_EVENT_LISTENER);
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
shard.shardPath(),
shard.indexSettings().getIndexMetaData(),
wrapper,
new InternalEngineFactory(),
() -> {},
EMPTY_EVENT_LISTENER);
recoverShardFromStore(newShard);
@ -2032,8 +2039,13 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(shard);
IndexShard newShard = newShard(
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
shard.shardPath(), shard.indexSettings().getIndexMetaData(), wrapper, null, () -> {}, EMPTY_EVENT_LISTENER);
ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE),
shard.shardPath(),
shard.indexSettings().getIndexMetaData(),
wrapper,
new InternalEngineFactory(),
() -> {},
EMPTY_EVENT_LISTENER);
recoverShardFromStore(newShard);
@ -3016,7 +3028,7 @@ public class IndexShardTests extends IndexShardTestCase {
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
AtomicBoolean markedInactive = new AtomicBoolean();
AtomicReference<IndexShard> primaryRef = new AtomicReference<>();
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, () -> {
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, new InternalEngineFactory(), () -> {
}, new IndexEventListener() {
@Override
public void onShardInactive(IndexShard indexShard) {

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
@ -42,6 +43,11 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperService;
@ -51,10 +57,12 @@ import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.IndicesService.ShardDeletionCheckResult;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.hamcrest.RegexMatcher;
import java.io.IOException;
import java.util.ArrayList;
@ -63,14 +71,18 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.mock;
@ -88,9 +100,70 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.getPlugins());
plugins.add(TestPlugin.class);
return plugins;
return Stream.concat(
super.getPlugins().stream(),
Stream.of(TestPlugin.class, FooEnginePlugin.class, BarEnginePlugin.class))
.collect(Collectors.toList());
}
public static class FooEnginePlugin extends Plugin implements EnginePlugin {
static class FooEngineFactory implements EngineFactory {
@Override
public Engine newReadWriteEngine(final EngineConfig config) {
return new InternalEngine(config);
}
}
private static final Setting<Boolean> FOO_INDEX_SETTING =
Setting.boolSetting("index.foo_index", false, Setting.Property.IndexScope);
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(FOO_INDEX_SETTING);
}
@Override
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
if (FOO_INDEX_SETTING.get(indexSettings.getSettings())) {
return Optional.of(new FooEngineFactory());
} else {
return Optional.empty();
}
}
}
public static class BarEnginePlugin extends Plugin implements EnginePlugin {
static class BarEngineFactory implements EngineFactory {
@Override
public Engine newReadWriteEngine(final EngineConfig config) {
return new InternalEngine(config);
}
}
private static final Setting<Boolean> BAR_INDEX_SETTING =
Setting.boolSetting("index.bar_index", false, Setting.Property.IndexScope);
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(BAR_INDEX_SETTING);
}
@Override
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
if (BAR_INDEX_SETTING.get(indexSettings.getSettings())) {
return Optional.of(new BarEngineFactory());
} else {
return Optional.empty();
}
}
}
public static class TestPlugin extends Plugin implements MapperPlugin {
@ -438,4 +511,56 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
assertTrue(indicesService.isMetaDataField(builtIn));
}
}
public void testGetEngineFactory() throws IOException {
final IndicesService indicesService = getIndicesService();
final Boolean[] values = new Boolean[] { true, false, null };
for (final Boolean value : values) {
final String indexName = "foo-" + value;
final Index index = new Index(indexName, UUIDs.randomBase64UUID());
final Settings.Builder builder = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID());
if (value != null) {
builder.put(FooEnginePlugin.FOO_INDEX_SETTING.getKey(), value);
}
final IndexMetaData indexMetaData = new IndexMetaData.Builder(index.getName())
.settings(builder.build())
.numberOfShards(1)
.numberOfReplicas(0)
.build();
final IndexService indexService = indicesService.createIndex(indexMetaData, Collections.emptyList());
if (value != null && value) {
assertThat(indexService.getEngineFactory(), instanceOf(FooEnginePlugin.FooEngineFactory.class));
} else {
assertThat(indexService.getEngineFactory(), instanceOf(InternalEngineFactory.class));
}
}
}
public void testConflictingEngineFactories() throws IOException {
final String indexName = "foobar";
final Index index = new Index(indexName, UUIDs.randomBase64UUID());
final Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID())
.put(FooEnginePlugin.FOO_INDEX_SETTING.getKey(), true)
.put(BarEnginePlugin.BAR_INDEX_SETTING.getKey(), true)
.build();
final IndexMetaData indexMetaData = new IndexMetaData.Builder(index.getName())
.settings(settings)
.numberOfShards(1)
.numberOfReplicas(0)
.build();
final IndicesService indicesService = getIndicesService();
final IllegalStateException e =
expectThrows(IllegalStateException.class, () -> indicesService.createIndex(indexMetaData, Collections.emptyList()));
final String pattern =
".*multiple engine factories provided for \\[foobar/.*\\]: \\[.*FooEngineFactory\\],\\[.*BarEngineFactory\\].*";
assertThat(e, hasToString(new RegexMatcher(pattern)));
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexShardTestCase;
@ -99,8 +100,14 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
// build a new shard using the same store directory as the closed shard
ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(shard.routingEntry(), EXISTING_STORE_INSTANCE);
shard = newShard(shardRouting, shard.shardPath(), shard.indexSettings().getIndexMetaData(), null, null, () -> {},
EMPTY_EVENT_LISTENER);
shard = newShard(
shardRouting,
shard.shardPath(),
shard.indexSettings().getIndexMetaData(),
null,
new InternalEngineFactory(),
() -> {},
EMPTY_EVENT_LISTENER);
// restore the shard
recoverShardFromSnapshot(shard, snapshot, repository);

View File

@ -20,25 +20,23 @@ package org.elasticsearch.index;
import org.apache.lucene.index.AssertingDirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.engine.MockEngineFactory;
import org.elasticsearch.test.engine.MockEngineSupport;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
/**
* A plugin to use {@link MockEngineFactory}.
*
* Subclasses may override the reader wrapper used.
*/
public class MockEngineFactoryPlugin extends Plugin {
public class MockEngineFactoryPlugin extends Plugin implements EnginePlugin {
@Override
public List<Setting<?>> getSettings() {
@ -46,8 +44,8 @@ public class MockEngineFactoryPlugin extends Plugin {
}
@Override
public void onIndexModule(IndexModule module) {
module.engineFactory.set(new MockEngineFactory(getReaderWrapperClass()));
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
return Optional.of(new MockEngineFactory(getReaderWrapperClass()));
}
protected Class<? extends FilterDirectoryReader> getReaderWrapperClass() {

View File

@ -56,6 +56,7 @@ import org.elasticsearch.index.cache.query.DisabledQueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse;
@ -227,7 +228,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
@Nullable IndexSearcherWrapper searcherWrapper, Runnable globalCheckpointSyncer) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, nodeId, primary, ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting, indexMetaData, searcherWrapper, null, globalCheckpointSyncer);
return newShard(shardRouting, indexMetaData, searcherWrapper, new InternalEngineFactory(), globalCheckpointSyncer);
}
@ -241,7 +242,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
*/
protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners)
throws IOException {
return newShard(routing, indexMetaData, null, null, () -> {}, listeners);
return newShard(routing, indexMetaData, null, new InternalEngineFactory(), () -> {}, listeners);
}
/**

View File

@ -10,6 +10,7 @@ import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
@ -68,7 +69,7 @@ public class WatcherPluginTests extends ESTestCase {
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(Watch.INDEX, settings);
AnalysisRegistry registry = new AnalysisRegistry(TestEnvironment.newEnvironment(settings), emptyMap(), emptyMap(), emptyMap(),
emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap());
IndexModule indexModule = new IndexModule(indexSettings, registry);
IndexModule indexModule = new IndexModule(indexSettings, registry, new InternalEngineFactory());
// this will trip an assertion if the watcher indexing operation listener is null (which it is) but we try to add it
watcher.onIndexModule(indexModule);