From c0eca94a044eec90be859f28a6b0cd652c789714 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 30 Sep 2015 23:13:28 +0200 Subject: [PATCH] Remove shard-level injector Today we use a hirachical injector on the shard level for each shard created. This commit removes the shard level injetor and replaces it with good old constructor calls. This also removes all shard level plugin facilities such that plugins can only have node or index level modules. For plugins that need to track shard lifecycles they should use the relevant callback from the lifecycle we already provide. --- core/pom.xml | 2 +- .../org/elasticsearch/index/IndexModule.java | 14 +- .../org/elasticsearch/index/IndexService.java | 111 +++------------ .../index/IndexServicesProvider.java | 132 ++++++++++++++++++ .../elasticsearch/index/engine/Engine.java | 37 ++++- .../index/engine/EngineConfig.java | 14 +- .../index/engine/IndexSearcherWrapper.java | 2 +- .../engine/IndexSearcherWrappingService.java | 94 ------------- .../elasticsearch/index/shard/IndexShard.java | 42 +++--- .../index/shard/IndexShardModule.java | 16 +-- .../index/shard/ShadowIndexShard.java | 39 +----- .../elasticsearch/index/store/IndexStore.java | 6 +- .../elasticsearch/indices/IndicesService.java | 2 +- .../org/elasticsearch/plugins/Plugin.java | 7 - .../elasticsearch/plugins/PluginsService.java | 8 -- .../{shard => }/MockEngineFactoryPlugin.java | 7 +- .../index/engine/InternalEngineTests.java | 19 +-- .../index/engine/ShadowEngineTests.java | 2 +- .../indices/leaks/IndicesLeaksIT.java | 131 ----------------- .../test/InternalTestCluster.java | 2 +- .../test/store/MockFSIndexStore.java | 13 +- 21 files changed, 257 insertions(+), 443 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/index/IndexServicesProvider.java delete mode 100644 core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java rename core/src/test/java/org/elasticsearch/index/{shard => }/MockEngineFactoryPlugin.java (88%) delete mode 100644 core/src/test/java/org/elasticsearch/indices/leaks/IndicesLeaksIT.java diff --git a/core/pom.xml b/core/pom.xml index a96e3746bbd..2a5f6deabd1 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -274,7 +274,7 @@ org/elasticsearch/common/cli/CliToolTestCase$*.class org/elasticsearch/cluster/MockInternalClusterInfoService.class org/elasticsearch/cluster/MockInternalClusterInfoService$*.class - org/elasticsearch/index/shard/MockEngineFactoryPlugin.class + org/elasticsearch/index/MockEngineFactoryPlugin.class org/elasticsearch/search/MockSearchService.class org/elasticsearch/search/MockSearchService$*.class org/elasticsearch/search/aggregations/bucket/AbstractTermsTestCase.class diff --git a/core/src/main/java/org/elasticsearch/index/IndexModule.java b/core/src/main/java/org/elasticsearch/index/IndexModule.java index d94eb4f9c7d..59bec88d81a 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/core/src/main/java/org/elasticsearch/index/IndexModule.java @@ -20,21 +20,21 @@ package org.elasticsearch.index; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngineFactory; /** * */ public class IndexModule extends AbstractModule { - private final Settings settings; - - public IndexModule(Settings settings) { - this.settings = settings; - } - + // pkg private so tests can mock + Class engineFactoryImpl = InternalEngineFactory.class; + @Override protected void configure() { + bind(EngineFactory.class).to(engineFactoryImpl); bind(IndexService.class).asEagerSingleton(); + bind(IndexServicesProvider.class).asEagerSingleton(); } } diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 3c40b02a4b9..f3c330a182b 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -24,16 +24,11 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.inject.CreationException; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Injector; -import org.elasticsearch.common.inject.Injectors; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; @@ -49,15 +44,10 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardModule; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.ShardNotFoundException; -import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.shard.*; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.store.StoreModule; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InternalIndicesLifecycle; @@ -110,25 +100,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone private final NodeEnvironment nodeEnv; private final IndicesService indicesServices; - private volatile ImmutableMap shards = ImmutableMap.of(); + private volatile ImmutableMap shards = ImmutableMap.of(); - private static class IndexShardInjectorPair { - private final IndexShard indexShard; - private final Injector injector; - - public IndexShardInjectorPair(IndexShard indexShard, Injector injector) { - this.indexShard = indexShard; - this.injector = injector; - } - - public IndexShard getIndexShard() { - return indexShard; - } - - public Injector getInjector() { - return injector; - } - } private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean deleted = new AtomicBoolean(false); @@ -173,7 +146,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone @Override public Iterator iterator() { - return shards.values().stream().map((p) -> p.getIndexShard()).iterator(); + return shards.values().iterator(); } public boolean hasShard(int shardId) { @@ -185,11 +158,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone */ @Nullable public IndexShard shard(int shardId) { - IndexShardInjectorPair indexShardInjectorPair = shards.get(shardId); - if (indexShardInjectorPair != null) { - return indexShardInjectorPair.getIndexShard(); - } - return null; + return shards.get(shardId); } /** @@ -261,16 +230,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } } - /** - * Return the shard injector for the provided id, or throw an exception if there is no such shard. - */ - public Injector shardInjectorSafe(int shardId) { - IndexShardInjectorPair indexShardInjectorPair = shards.get(shardId); - if (indexShardInjectorPair == null) { - throw new ShardNotFoundException(new ShardId(index, shardId)); - } - return indexShardInjectorPair.getInjector(); - } public String indexUUID() { return indexSettings.get(IndexMetaData.SETTING_INDEX_UUID, IndexMetaData.INDEX_UUID_NA_VALUE); @@ -304,7 +263,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone final ShardId shardId = new ShardId(index, sShardId); ShardLock lock = null; boolean success = false; - Injector shardInjector = null; + Store store = null; + IndexShard indexShard = null; try { lock = nodeEnv.shardLock(shardId, TimeUnit.SECONDS.toMillis(5)); indicesLifecycle.beforeIndexShardCreated(shardId, indexSettings); @@ -351,38 +311,18 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone // if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary. final boolean canDeleteShardContent = IndexMetaData.isOnSharedFilesystem(indexSettings) == false || (primary && IndexMetaData.isOnSharedFilesystem(indexSettings)); - ModulesBuilder modules = new ModulesBuilder(); - // plugin modules must be added here, before others or we can get crazy injection errors... - for (Module pluginModule : pluginsService.shardModules(indexSettings)) { - modules.add(pluginModule); - } - modules.add(new IndexShardModule(shardId, primary, indexSettings)); - modules.add(new StoreModule(injector.getInstance(IndexStore.class).shardDirectory(), lock, - new StoreCloseListener(shardId, canDeleteShardContent, new Closeable() { - @Override - public void close() throws IOException { - injector.getInstance(IndicesQueryCache.class).onClose(shardId); - } - }), path)); - pluginsService.processModules(modules); - - try { - shardInjector = modules.createChildInjector(injector); - } catch (CreationException e) { - ElasticsearchException ex = new ElasticsearchException("failed to create shard", Injectors.getFirstErrorFailure(e)); - ex.setShard(shardId); - throw ex; - } catch (Throwable e) { - ElasticsearchException ex = new ElasticsearchException("failed to create shard", e); - ex.setShard(shardId); - throw ex; + IndexStore indexStore = injector.getInstance(IndexStore.class); + store = new Store(shardId, indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> injector.getInstance(IndicesQueryCache.class).onClose(shardId))); + if (primary && IndexMetaData.isIndexUsingShadowReplicas(indexSettings)) { + indexShard = new ShadowIndexShard(shardId, indexSettings, path, store, injector.getInstance(IndexServicesProvider.class)); + } else { + indexShard = new IndexShard(shardId, indexSettings, path, store, injector.getInstance(IndexServicesProvider.class)); } - IndexShard indexShard = shardInjector.getInstance(IndexShard.class); indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created"); indicesLifecycle.afterIndexShardCreated(indexShard); - shards = newMapBuilder(shards).put(shardId.id(), new IndexShardInjectorPair(indexShard, shardInjector)).immutableMap(); + shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); settingsService.addListener(indexShard); success = true; return indexShard; @@ -393,10 +333,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } finally { if (success == false) { IOUtils.closeWhileHandlingException(lock); - if (shardInjector != null) { - IndexShard indexShard = shardInjector.getInstance(IndexShard.class); - closeShardInjector("initialization failed", shardId, shardInjector, indexShard); - } + closeShard("initialization failed", shardId, indexShard, store); } } } @@ -409,29 +346,19 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone return; } logger.debug("[{}] closing... (reason: [{}])", shardId, reason); - HashMap tmpShardsMap = new HashMap<>(shards); - IndexShardInjectorPair indexShardInjectorPair = tmpShardsMap.remove(shardId); - indexShard = indexShardInjectorPair.getIndexShard(); - shardInjector = indexShardInjectorPair.getInjector(); + HashMap tmpShardsMap = new HashMap<>(shards); + indexShard = tmpShardsMap.remove(shardId); shards = ImmutableMap.copyOf(tmpShardsMap); - closeShardInjector(reason, sId, shardInjector, indexShard); + closeShard(reason, sId, indexShard, indexShard.store()); logger.debug("[{}] closed (reason: [{}])", shardId, reason); } - private void closeShardInjector(String reason, ShardId sId, Injector shardInjector, IndexShard indexShard) { + private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store store) { final int shardId = sId.id(); try { try { indicesLifecycle.beforeIndexShardClosed(sId, indexShard, indexSettings); } finally { - // close everything else even if the beforeIndexShardClosed threw an exception - for (Class closeable : pluginsService.shardServices()) { - try { - shardInjector.getInstance(closeable).close(); - } catch (Throwable e) { - logger.debug("[{}] failed to clean plugin shard service [{}]", e, shardId, closeable); - } - } // this logic is tricky, we want to close the engine so we rollback the changes done to it // and close the shard so no operations are allowed to it if (indexShard != null) { @@ -449,7 +376,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } } finally { try { - shardInjector.getInstance(Store.class).close(); + store.close(); } catch (Throwable e) { logger.warn("[{}] failed to close store on shard removal (reason: [{}])", e, shardId, reason); } diff --git a/core/src/main/java/org/elasticsearch/index/IndexServicesProvider.java b/core/src/main/java/org/elasticsearch/index/IndexServicesProvider.java new file mode 100644 index 00000000000..ad136c58dd6 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/IndexServicesProvider.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.index.aliases.IndexAliasesService; +import org.elasticsearch.index.cache.IndexCache; +import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.fielddata.IndexFieldDataService; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.query.IndexQueryParserService; +import org.elasticsearch.index.similarity.SimilarityService; +import org.elasticsearch.index.termvectors.TermVectorsService; +import org.elasticsearch.indices.IndicesLifecycle; +import org.elasticsearch.indices.IndicesWarmer; +import org.elasticsearch.indices.cache.query.IndicesQueryCache; +import org.elasticsearch.threadpool.ThreadPool; + +/** + * Simple provider class that holds the Index and Node level services used by + * a shard. + * This is just a temporary solution until we cleaned up index creation and removed injectors on that level as well. + */ +public final class IndexServicesProvider { + + private final IndicesLifecycle indicesLifecycle; + private final ThreadPool threadPool; + private final MapperService mapperService; + private final IndexQueryParserService queryParserService; + private final IndexCache indexCache; + private final IndexAliasesService indexAliasesService; + private final IndicesQueryCache indicesQueryCache; + private final CodecService codecService; + private final TermVectorsService termVectorsService; + private final IndexFieldDataService indexFieldDataService; + private final IndicesWarmer warmer; + private final SimilarityService similarityService; + private final EngineFactory factory; + private final BigArrays bigArrays; + + @Inject + public IndexServicesProvider(IndicesLifecycle indicesLifecycle, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays) { + this.indicesLifecycle = indicesLifecycle; + this.threadPool = threadPool; + this.mapperService = mapperService; + this.queryParserService = queryParserService; + this.indexCache = indexCache; + this.indexAliasesService = indexAliasesService; + this.indicesQueryCache = indicesQueryCache; + this.codecService = codecService; + this.termVectorsService = termVectorsService; + this.indexFieldDataService = indexFieldDataService; + this.warmer = warmer; + this.similarityService = similarityService; + this.factory = factory; + this.bigArrays = bigArrays; + } + + public IndicesLifecycle getIndicesLifecycle() { + return indicesLifecycle; + } + + public ThreadPool getThreadPool() { + return threadPool; + } + + public MapperService getMapperService() { + return mapperService; + } + + public IndexQueryParserService getQueryParserService() { + return queryParserService; + } + + public IndexCache getIndexCache() { + return indexCache; + } + + public IndexAliasesService getIndexAliasesService() { + return indexAliasesService; + } + + public IndicesQueryCache getIndicesQueryCache() { + return indicesQueryCache; + } + + public CodecService getCodecService() { + return codecService; + } + + public TermVectorsService getTermVectorsService() { + return termVectorsService; + } + + public IndexFieldDataService getIndexFieldDataService() { + return indexFieldDataService; + } + + public IndicesWarmer getWarmer() { + return warmer; + } + + public SimilarityService getSimilarityService() { + return similarityService; + } + + public EngineFactory getFactory() { + return factory; + } + + public BigArrays getBigArrays() { + return bigArrays; + } +} diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index f9331c4416a..dc35b95d2c1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -26,6 +26,7 @@ import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.join.BitSetProducer; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountables; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Base64; import org.elasticsearch.common.Nullable; @@ -78,6 +79,7 @@ public abstract class Engine implements Closeable { protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock()); protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock()); + private final IndexSearcherWrapper searcherWrapper; protected volatile Throwable failedEngine = null; @@ -92,6 +94,7 @@ public abstract class Engine implements Closeable { engineConfig.getIndexSettings(), engineConfig.getShardId()); this.failedEngineListener = engineConfig.getFailedEngineListener(); this.deletionPolicy = engineConfig.getDeletionPolicy(); + this.searcherWrapper = engineConfig.getSearcherWrapper(); } /** Returns 0 in the case where accountable is null, otherwise returns {@code ramBytesUsed()} */ @@ -279,7 +282,7 @@ public abstract class Engine implements Closeable { try { final Searcher retVal = newSearcher(source, searcher, manager); success = true; - return config().getWrappingService().wrap(engineConfig, retVal); + return wrap(engineConfig, retVal); } finally { if (!success) { manager.release(searcher); @@ -298,6 +301,38 @@ public abstract class Engine implements Closeable { } } + /** + * If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher + * gets wrapped and a new {@link Searcher} instances is returned, otherwise the provided {@link Searcher} is returned. + * + * This is invoked each time a {@link Searcher} is requested to do an operation. (for example search) + */ + private Searcher wrap(EngineConfig engineConfig, final Searcher engineSearcher) throws EngineException { + if (searcherWrapper == null) { + return engineSearcher; + } + + DirectoryReader reader = searcherWrapper.wrap((DirectoryReader) engineSearcher.reader()); + IndexSearcher innerIndexSearcher = new IndexSearcher(reader); + innerIndexSearcher.setQueryCache(engineConfig.getQueryCache()); + innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy()); + innerIndexSearcher.setSimilarity(engineConfig.getSimilarity()); + // TODO: Right now IndexSearcher isn't wrapper friendly, when it becomes wrapper friendly we should revise this extension point + // For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten + // This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times + IndexSearcher indexSearcher = searcherWrapper.wrap(engineConfig, innerIndexSearcher); + if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) { + return engineSearcher; + } else { + return new Engine.Searcher(engineSearcher.source(), indexSearcher) { + @Override + public void close() throws ElasticsearchException { + engineSearcher.close(); + } + }; + } + } + /** returns the translog for this engine */ public abstract Translog getTranslog(); diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index c6e67243514..7d57bb5b0b3 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.similarities.Similarity; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -73,7 +74,7 @@ public final class EngineConfig { private final boolean forceNewTranslog; private final QueryCache queryCache; private final QueryCachingPolicy queryCachingPolicy; - private final IndexSearcherWrappingService wrappingService; + private final SetOnce searcherWrapper = new SetOnce<>(); /** * Index setting for compound file on flush. This setting is realtime updateable. @@ -121,7 +122,7 @@ public final class EngineConfig { Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener, - TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, IndexSearcherWrappingService wrappingService, TranslogConfig translogConfig) { + TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) { this.shardId = shardId; this.indexSettings = indexSettings; this.threadPool = threadPool; @@ -135,7 +136,6 @@ public final class EngineConfig { this.similarity = similarity; this.codecService = codecService; this.failedEngineListener = failedEngineListener; - this.wrappingService = wrappingService; this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush); codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME); indexingBufferSize = DEFAULT_INDEX_BUFFER_SIZE; @@ -380,8 +380,12 @@ public final class EngineConfig { return queryCachingPolicy; } - public IndexSearcherWrappingService getWrappingService() { - return wrappingService; + IndexSearcherWrapper getSearcherWrapper() { + return searcherWrapper.get(); + } + + public void setSearcherWrapper(IndexSearcherWrapper searcherWrapper) { + this.searcherWrapper.set(searcherWrapper); } /** diff --git a/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java b/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java index 665d17a2f86..8a407f00eae 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java +++ b/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrapper.java @@ -26,7 +26,7 @@ import org.apache.lucene.search.IndexSearcher; * Extension point to add custom functionality at request time to the {@link DirectoryReader} * and {@link IndexSearcher} managed by the {@link Engine}. */ -public interface IndexSearcherWrapper { +interface IndexSearcherWrapper { /** * @param reader The provided directory reader to be wrapped to add custom functionality diff --git a/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java b/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java deleted file mode 100644 index 23d05f01dc7..00000000000 --- a/core/src/main/java/org/elasticsearch/index/engine/IndexSearcherWrappingService.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.engine; - -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.search.IndexSearcher; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.index.engine.Engine.Searcher; - -import java.util.Set; - -/** - * Service responsible for wrapping the {@link DirectoryReader} and {@link IndexSearcher} of a {@link Searcher} via the - * configured {@link IndexSearcherWrapper} instance. This allows custom functionally to be added the {@link Searcher} - * before being used to do an operation (search, get, field stats etc.) - */ -// TODO: This needs extension point is a bit hacky now, because the IndexSearch from the engine can only be wrapped once, -// if we allowed the IndexSearcher to be wrapped multiple times then a custom IndexSearcherWrapper needs have good -// control over its location in the wrapping chain -public final class IndexSearcherWrappingService { - - private final IndexSearcherWrapper wrapper; - - // for unit tests: - IndexSearcherWrappingService() { - this.wrapper = null; - } - - @Inject - // Use a Set parameter here, because constructor parameter can't be optional - // and I prefer to keep the `wrapper` field final. - public IndexSearcherWrappingService(Set wrappers) { - if (wrappers.size() > 1) { - throw new IllegalStateException("wrapping of the index searcher by more than one wrappers is forbidden, found the following wrappers [" + wrappers + "]"); - } - if (wrappers.isEmpty()) { - this.wrapper = null; - } else { - this.wrapper = wrappers.iterator().next(); - } - } - - /** - * If there are configured {@link IndexSearcherWrapper} instances, the {@link IndexSearcher} of the provided engine searcher - * gets wrapped and a new {@link Searcher} instances is returned, otherwise the provided {@link Searcher} is returned. - * - * This is invoked each time a {@link Searcher} is requested to do an operation. (for example search) - */ - public Searcher wrap(EngineConfig engineConfig, final Searcher engineSearcher) throws EngineException { - if (wrapper == null) { - return engineSearcher; - } - - DirectoryReader reader = wrapper.wrap((DirectoryReader) engineSearcher.reader()); - IndexSearcher innerIndexSearcher = new IndexSearcher(reader); - innerIndexSearcher.setQueryCache(engineConfig.getQueryCache()); - innerIndexSearcher.setQueryCachingPolicy(engineConfig.getQueryCachingPolicy()); - innerIndexSearcher.setSimilarity(engineConfig.getSimilarity()); - // TODO: Right now IndexSearcher isn't wrapper friendly, when it becomes wrapper friendly we should revise this extension point - // For example if IndexSearcher#rewrite() is overwritten than also IndexSearcher#createNormalizedWeight needs to be overwritten - // This needs to be fixed before we can allow the IndexSearcher from Engine to be wrapped multiple times - IndexSearcher indexSearcher = wrapper.wrap(engineConfig, innerIndexSearcher); - if (reader == engineSearcher.reader() && indexSearcher == innerIndexSearcher) { - return engineSearcher; - } else { - return new Engine.Searcher(engineSearcher.source(), indexSearcher) { - - @Override - public void close() throws ElasticsearchException { - engineSearcher.close(); - } - }; - } - } - -} diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c98a9c0f9dd..2137a42c658 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -51,11 +51,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.gateway.MetaDataStateFormat; +import org.elasticsearch.index.IndexServicesProvider; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.cache.IndexCache; @@ -99,7 +99,6 @@ import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.index.translog.TranslogWriter; import org.elasticsearch.index.warmer.ShardIndexWarmerService; import org.elasticsearch.index.warmer.WarmerStats; -import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.indices.cache.query.IndicesQueryCache; @@ -161,7 +160,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett protected volatile IndexShardState state; protected final AtomicReference currentEngineReference = new AtomicReference<>(); protected final EngineFactory engineFactory; - private final IndexSearcherWrappingService wrappingService; @Nullable private RecoveryState recoveryState; @@ -193,39 +191,33 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett private EnumSet readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); @Inject - public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndicesLifecycle indicesLifecycle, Store store, - ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, - IndicesQueryCache indicesQueryCache, CodecService codecService, - TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, - @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, - ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) { + public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, ShardPath path, Store store, IndexServicesProvider provider) { super(shardId, indexSettings); - this.codecService = codecService; - this.warmer = warmer; + this.codecService = provider.getCodecService(); + this.warmer = provider.getWarmer(); this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); - this.similarityService = similarityService; - this.wrappingService = wrappingService; + this.similarityService = provider.getSimilarityService(); Objects.requireNonNull(store, "Store must be provided to the index shard"); - this.engineFactory = factory; - this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; + this.engineFactory = provider.getFactory(); + this.indicesLifecycle = (InternalIndicesLifecycle) provider.getIndicesLifecycle(); this.store = store; this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings); - this.threadPool = threadPool; - this.mapperService = mapperService; - this.queryParserService = queryParserService; - this.indexCache = indexCache; - this.indexAliasesService = indexAliasesService; + this.threadPool = provider.getThreadPool(); + this.mapperService = provider.getMapperService(); + this.queryParserService = provider.getQueryParserService(); + this.indexCache = provider.getIndexCache(); + this.indexAliasesService = provider.getIndexAliasesService(); this.indexingService = new ShardIndexingService(shardId, indexSettings); this.getService = new ShardGetService(this, mapperService); - this.termVectorsService = termVectorsService; + this.termVectorsService = provider.getTermVectorsService(); this.searchService = new ShardSearchStats(indexSettings); this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings); - this.indicesQueryCache = indicesQueryCache; + this.indicesQueryCache = provider.getIndicesQueryCache(); this.shardQueryCache = new ShardRequestCache(shardId, indexSettings); this.shardFieldData = new ShardFieldData(); this.shardPercolateService = new ShardPercolateService(shardId, indexSettings); + this.indexFieldDataService = provider.getIndexFieldDataService(); this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, indicesLifecycle, mapperService, indexFieldDataService, shardPercolateService); - this.indexFieldDataService = indexFieldDataService; this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings); state = IndexShardState.CREATED; this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL); @@ -238,7 +230,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett this.checkIndexOnStartup = indexSettings.get("index.shard.check_on_startup", "false"); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, getFromSettings(logger, indexSettings, Translog.Durabilty.REQUEST), - bigArrays, threadPool); + provider.getBigArrays(), threadPool); final QueryCachingPolicy cachingPolicy; // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis @@ -1403,7 +1395,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett }; return new EngineConfig(shardId, threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig, - mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, wrappingService, translogConfig); + mapperService.indexAnalyzer(), similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig); } private static class IndexShardOperationCounter extends AbstractRefCounted { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java index 188669f3fb2..2d97eea08d6 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardModule.java @@ -21,12 +21,7 @@ package org.elasticsearch.index.shard; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.engine.IndexSearcherWrapper; -import org.elasticsearch.index.engine.IndexSearcherWrappingService; -import org.elasticsearch.index.engine.EngineFactory; -import org.elasticsearch.index.engine.InternalEngineFactory; /** * The {@code IndexShardModule} module is responsible for binding the correct @@ -39,8 +34,7 @@ public class IndexShardModule extends AbstractModule { private final Settings settings; private final boolean primary; - // pkg private so tests can mock - Class engineFactoryImpl = InternalEngineFactory.class; + public IndexShardModule(ShardId shardId, boolean primary, Settings settings) { this.settings = settings; @@ -64,13 +58,5 @@ public class IndexShardModule extends AbstractModule { } else { bind(IndexShard.class).asEagerSingleton(); } - - bind(EngineFactory.class).to(engineFactoryImpl); - bind(IndexSearcherWrappingService.class).asEagerSingleton(); - // this injects an empty set in IndexSearcherWrappingService, otherwise guice can't construct IndexSearcherWrappingService - Multibinder multibinder - = Multibinder.newSetBinder(binder(), IndexSearcherWrapper.class); } - - } \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 62fa928faf1..c81b9e5c541 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -18,32 +18,14 @@ */ package org.elasticsearch.index.shard; -import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.index.aliases.IndexAliasesService; -import org.elasticsearch.index.cache.IndexCache; -import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.engine.IndexSearcherWrappingService; +import org.elasticsearch.index.IndexServicesProvider; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.engine.EngineFactory; -import org.elasticsearch.index.fielddata.IndexFieldDataService; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.MergeStats; -import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.settings.IndexSettings; -import org.elasticsearch.index.settings.IndexSettingsService; -import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.termvectors.TermVectorsService; -import org.elasticsearch.indices.IndicesLifecycle; -import org.elasticsearch.indices.IndicesWarmer; -import org.elasticsearch.indices.cache.query.IndicesQueryCache; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -55,23 +37,8 @@ import java.io.IOException; */ public final class ShadowIndexShard extends IndexShard { - @Inject - public ShadowIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, - IndicesLifecycle indicesLifecycle, Store store, - ThreadPool threadPool, MapperService mapperService, - IndexQueryParserService queryParserService, IndexCache indexCache, - IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache, - CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, - @Nullable IndicesWarmer warmer, - SimilarityService similarityService, - EngineFactory factory, - ShardPath path, BigArrays bigArrays, IndexSearcherWrappingService wrappingService) throws IOException { - super(shardId, indexSettings, indicesLifecycle, store, - threadPool, mapperService, queryParserService, indexCache, indexAliasesService, - indicesQueryCache, codecService, - termVectorsService, indexFieldDataService, - warmer, similarityService, - factory, path, bigArrays, wrappingService); + public ShadowIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, ShardPath path, Store store, IndexServicesProvider provider) throws IOException { + super(shardId, indexSettings, path, store, provider); } /** diff --git a/core/src/main/java/org/elasticsearch/index/store/IndexStore.java b/core/src/main/java/org/elasticsearch/index/store/IndexStore.java index 4022dd75aa1..350eb7a0d81 100644 --- a/core/src/main/java/org/elasticsearch/index/store/IndexStore.java +++ b/core/src/main/java/org/elasticsearch/index/store/IndexStore.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; +import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.indices.store.IndicesStore; import java.io.Closeable; @@ -112,7 +113,8 @@ public class IndexStore extends AbstractIndexComponent implements Closeable { /** * The shard store class that should be used for each shard. */ - public Class shardDirectory() { - return FsDirectoryService.class; + public DirectoryService newDirectoryService(ShardPath path) { + return new FsDirectoryService(indexSettings, this, path); } + } diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java index 3b24544267d..e2448670f83 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -346,7 +346,7 @@ public class IndicesService extends AbstractLifecycleComponent i modules.add(new IndexFieldDataModule(indexSettings)); modules.add(new MapperServiceModule()); modules.add(new IndexAliasesServiceModule()); - modules.add(new IndexModule(indexSettings)); + modules.add(new IndexModule()); pluginsService.processModules(modules); diff --git a/core/src/main/java/org/elasticsearch/plugins/Plugin.java b/core/src/main/java/org/elasticsearch/plugins/Plugin.java index 72077954ea8..04f789e5c34 100644 --- a/core/src/main/java/org/elasticsearch/plugins/Plugin.java +++ b/core/src/main/java/org/elasticsearch/plugins/Plugin.java @@ -73,13 +73,6 @@ public abstract class Plugin { return Collections.emptyList(); } - /** - * Per index shard module. - */ - public Collection shardModules(Settings indexSettings) { - return Collections.emptyList(); - } - /** * Per index shard service that will be automatically closed. */ diff --git a/core/src/main/java/org/elasticsearch/plugins/PluginsService.java b/core/src/main/java/org/elasticsearch/plugins/PluginsService.java index 5834efc398d..cd71fdbc785 100644 --- a/core/src/main/java/org/elasticsearch/plugins/PluginsService.java +++ b/core/src/main/java/org/elasticsearch/plugins/PluginsService.java @@ -250,14 +250,6 @@ public class PluginsService extends AbstractComponent { return services; } - public Collection shardModules(Settings indexSettings) { - List modules = new ArrayList<>(); - for (Tuple plugin : plugins) { - modules.addAll(plugin.v2().shardModules(indexSettings)); - } - return modules; - } - public Collection> shardServices() { List> services = new ArrayList<>(); for (Tuple plugin : plugins) { diff --git a/core/src/test/java/org/elasticsearch/index/shard/MockEngineFactoryPlugin.java b/core/src/test/java/org/elasticsearch/index/MockEngineFactoryPlugin.java similarity index 88% rename from core/src/test/java/org/elasticsearch/index/shard/MockEngineFactoryPlugin.java rename to core/src/test/java/org/elasticsearch/index/MockEngineFactoryPlugin.java index d1b50487c63..94ddde0e3fb 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/MockEngineFactoryPlugin.java +++ b/core/src/test/java/org/elasticsearch/index/MockEngineFactoryPlugin.java @@ -16,10 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.index.shard; +package org.elasticsearch.index; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.engine.MockEngineFactory; import org.elasticsearch.test.engine.MockEngineSupportModule; @@ -27,7 +28,7 @@ import org.elasticsearch.test.engine.MockEngineSupportModule; import java.util.Collection; import java.util.Collections; -// this must exist in the same package as IndexShardModule to allow access to setting the impl +// this must exist in the same package as IndexModule to allow access to setting the impl public class MockEngineFactoryPlugin extends Plugin { @Override public String name() { @@ -41,7 +42,7 @@ public class MockEngineFactoryPlugin extends Plugin { public Collection indexModules(Settings indexSettings) { return Collections.singletonList(new MockEngineSupportModule()); } - public void onModule(IndexShardModule module) { + public void onModule(IndexModule module) { module.engineFactoryImpl = MockEngineFactory.class; } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 01197fbfc5b..62964244344 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -232,15 +232,15 @@ public class InternalEngineTests extends ESTestCase { return new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); } - protected InternalEngine createEngine(Store store, Path translogPath, IndexSearcherWrapper... wrappers) { - return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy(), wrappers); + protected InternalEngine createEngine(Store store, Path translogPath) { + return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy()); } - protected InternalEngine createEngine(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy, IndexSearcherWrapper... wrappers) { - return new InternalEngine(config(indexSettings, store, translogPath, mergeSchedulerConfig, mergePolicy, wrappers), false); + protected InternalEngine createEngine(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) { + return new InternalEngine(config(indexSettings, store, translogPath, mergeSchedulerConfig, mergePolicy), false); } - public EngineConfig config(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy, IndexSearcherWrapper... wrappers) { + public EngineConfig config(Settings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool); @@ -251,7 +251,7 @@ public class InternalEngineTests extends ESTestCase { public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) { // we don't need to notify anybody in this test } - }, new TranslogHandler(shardId.index().getName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(new HashSet<>(Arrays.asList(wrappers))), translogConfig); + }, new TranslogHandler(shardId.index().getName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); try { config.setCreate(Lucene.indexExists(store.directory()) == false); } catch (IOException e) { @@ -514,7 +514,10 @@ public class InternalEngineTests extends ESTestCase { }; Store store = createStore(); Path translog = createTempDir("translog-test"); - InternalEngine engine = createEngine(store, translog, wrapper); + InternalEngine engine = createEngine(store, translog); + engine.close(); + engine.config().setSearcherWrapper(wrapper); + engine = new InternalEngine(engine.config(), false); Engine.Searcher searcher = engine.acquireSearcher("test"); assertThat(counter.get(), equalTo(2)); searcher.close(); @@ -1951,7 +1954,7 @@ public class InternalEngineTests extends ESTestCase { EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings() , null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(), config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener() - , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(), translogConfig); + , config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); try { new InternalEngine(brokenConfig, false); diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index a6ca90a73db..b5987a92623 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -216,7 +216,7 @@ public class ShadowEngineTests extends ESTestCase { @Override public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) { // we don't need to notify anybody in this test - }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), new IndexSearcherWrappingService(), translogConfig); + }}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig); try { config.setCreate(Lucene.indexExists(store.directory()) == false); } catch (IOException e) { diff --git a/core/src/test/java/org/elasticsearch/indices/leaks/IndicesLeaksIT.java b/core/src/test/java/org/elasticsearch/indices/leaks/IndicesLeaksIT.java deleted file mode 100644 index 422fee6879f..00000000000 --- a/core/src/test/java/org/elasticsearch/indices/leaks/IndicesLeaksIT.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.indices.leaks; - -import org.elasticsearch.common.inject.Injector; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.ESIntegTestCase.ClusterScope; -import org.junit.Test; - -import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.List; - -import static org.elasticsearch.test.ESIntegTestCase.Scope; -import static org.hamcrest.Matchers.nullValue; - -/** - */ -@ClusterScope(scope= Scope.TEST, numDataNodes =1) -public class IndicesLeaksIT extends ESIntegTestCase { - - - @SuppressWarnings({"ConstantConditions", "unchecked"}) - @Test - @BadApple(bugUrl = "https://github.com/elasticsearch/elasticsearch/issues/3232") - public void testIndexShardLifecycleLeak() throws Exception { - - client().admin().indices().prepareCreate("test") - .setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)) - .execute().actionGet(); - - client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); - - IndicesService indicesService = internalCluster().getDataNodeInstance(IndicesService.class); - IndexService indexService = indicesService.indexServiceSafe("test"); - Injector indexInjector = indexService.injector(); - IndexShard shard = indexService.shardSafe(0); - Injector shardInjector = indexService.shardInjectorSafe(0); - - performCommonOperations(); - - List indexReferences = new ArrayList<>(); - List shardReferences = new ArrayList<>(); - - // TODO if we could iterate over the already created classes on the injector, we can just add them here to the list - // for now, we simple add some classes that make sense - - // add index references - indexReferences.add(new WeakReference(indexService)); - indexReferences.add(new WeakReference(indexInjector)); - indexReferences.add(new WeakReference(indexService.mapperService())); - for (DocumentMapper documentMapper : indexService.mapperService().docMappers(true)) { - indexReferences.add(new WeakReference(documentMapper)); - } - indexReferences.add(new WeakReference(indexService.aliasesService())); - indexReferences.add(new WeakReference(indexService.analysisService())); - indexReferences.add(new WeakReference(indexService.fieldData())); - indexReferences.add(new WeakReference(indexService.queryParserService())); - - - // add shard references - shardReferences.add(new WeakReference(shard)); - shardReferences.add(new WeakReference(shardInjector)); - - indexService = null; - indexInjector = null; - shard = null; - shardInjector = null; - - cluster().wipeIndices("test"); - - for (int i = 0; i < 100; i++) { - System.gc(); - int indexNotCleared = 0; - for (WeakReference indexReference : indexReferences) { - if (indexReference.get() != null) { - indexNotCleared++; - } - } - int shardNotCleared = 0; - for (WeakReference shardReference : shardReferences) { - if (shardReference.get() != null) { - shardNotCleared++; - } - } - logger.info("round {}, indices {}/{}, shards {}/{}", i, indexNotCleared, indexReferences.size(), shardNotCleared, shardReferences.size()); - if (indexNotCleared == 0 && shardNotCleared == 0) { - break; - } - } - - //System.out.println("sleeping");Thread.sleep(1000000); - - for (WeakReference indexReference : indexReferences) { - assertThat("dangling index reference: " + indexReference.get(), indexReference.get(), nullValue()); - } - - for (WeakReference shardReference : shardReferences) { - assertThat("dangling shard reference: " + shardReference.get(), shardReference.get(), nullValue()); - } - } - - private void performCommonOperations() { - client().prepareIndex("test", "type", "1").setSource("field1", "value", "field2", 2, "field3", 3.0f).execute().actionGet(); - client().admin().indices().prepareRefresh().execute().actionGet(); - client().prepareSearch("test").setQuery(QueryBuilders.queryStringQuery("field1:value")).execute().actionGet(); - client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field1", "value")).execute().actionGet(); - } -} diff --git a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 53733bca3e3..4d107d1151d 100644 --- a/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/core/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -68,7 +68,7 @@ import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.MockEngineFactoryPlugin; +import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; diff --git a/core/src/test/java/org/elasticsearch/test/store/MockFSIndexStore.java b/core/src/test/java/org/elasticsearch/test/store/MockFSIndexStore.java index c5b5ac36f00..11a791c04f3 100644 --- a/core/src/test/java/org/elasticsearch/test/store/MockFSIndexStore.java +++ b/core/src/test/java/org/elasticsearch/test/store/MockFSIndexStore.java @@ -24,14 +24,19 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; +import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.index.store.FsDirectoryService; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStoreModule; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.plugins.Plugin; public class MockFSIndexStore extends IndexStore { + private final IndicesService indicesService; + public static class TestPlugin extends Plugin { @Override public String name() { @@ -52,13 +57,13 @@ public class MockFSIndexStore extends IndexStore { @Inject public MockFSIndexStore(Index index, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, - IndicesStore indicesStore) { + IndicesStore indicesStore, IndicesService indicesService) { super(index, indexSettings, indexSettingsService, indicesStore); + this.indicesService = indicesService; } - @Override - public Class shardDirectory() { - return MockFSDirectoryService.class; + public DirectoryService newDirectoryService(ShardPath path) { + return new MockFSDirectoryService(indexSettings, this, indicesService, path); } }