From 33521fc27c77d454ba20308a578f5911a5d3dd39 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 22 Mar 2016 10:30:33 +0100 Subject: [PATCH] Detach IndexShard from node services this is the last step to remove node level service from IndexShard. This means that tests can now more easily create an IndexShard instance without starting a node and removes the dependency between IndexShard and Client/ScriptService --- .../resources/checkstyle_suppressions.xml | 2 - .../org/elasticsearch/index/IndexService.java | 53 ++++++++++++++----- .../elasticsearch/index/shard/IndexShard.java | 8 +-- .../index/shard/ShadowIndexShard.java | 13 +++-- .../index/shard/IndexShardTests.java | 3 +- .../test/ESSingleNodeTestCase.java | 4 +- 6 files changed, 55 insertions(+), 28 deletions(-) diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 72e10cb9398..07dd29a33ad 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -435,7 +435,6 @@ - @@ -602,7 +601,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index bb73e212a77..1b55adf78cc 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; @@ -114,6 +115,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC private volatile AsyncRefreshTask refreshTask; private volatile AsyncTranslogFSync fsyncTask; private final SearchSlowLog searchSlowLog; + private final ThreadPool threadPool; + private final BigArrays bigArrays; public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv, SimilarityService similarityService, @@ -132,9 +135,13 @@ public final class IndexService extends AbstractIndexComponent implements IndexC this.indexSettings = indexSettings; this.analysisService = registry.build(indexSettings); this.similarityService = similarityService; - this.mapperService = new MapperService(indexSettings, analysisService, similarityService, mapperRegistry, IndexService.this::newQueryShardContext); - this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, nodeServicesProvider.getCircuitBreakerService(), mapperService); + this.mapperService = new MapperService(indexSettings, analysisService, similarityService, mapperRegistry, + IndexService.this::newQueryShardContext); + this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, + nodeServicesProvider.getCircuitBreakerService(), mapperService); this.shardStoreDeleter = shardStoreDeleter; + this.bigArrays = nodeServicesProvider.getBigArrays(); + this.threadPool = nodeServicesProvider.getThreadPool(); this.eventListener = eventListener; this.nodeEnv = nodeEnv; this.nodeServicesProvider = nodeServicesProvider; @@ -142,7 +149,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC indexFieldData.setListener(new FieldDataCacheListener(this)); this.bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetCacheListener(this)); PercolatorQueryCache percolatorQueryCache = new PercolatorQueryCache(indexSettings, IndexService.this::newQueryShardContext); - this.warmer = new IndexWarmer(indexSettings.getSettings(), nodeServicesProvider.getThreadPool(), bitsetFilterCache.createListener(nodeServicesProvider.getThreadPool()), percolatorQueryCache.createListener(nodeServicesProvider.getThreadPool())); + this.warmer = new IndexWarmer(indexSettings.getSettings(), threadPool, + bitsetFilterCache.createListener(threadPool), + percolatorQueryCache.createListener(threadPool)); this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache, percolatorQueryCache); this.engineFactory = engineFactory; // initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE @@ -232,7 +241,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } } } finally { - IOUtils.close(bitsetFilterCache, indexCache, mapperService, indexFieldData, analysisService, refreshTask, fsyncTask, cache().getPercolatorQueryCache()); + IOUtils.close(bitsetFilterCache, indexCache, mapperService, indexFieldData, analysisService, refreshTask, fsyncTask, + cache().getPercolatorQueryCache()); } } } @@ -302,7 +312,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } dataPathToShardCount.put(dataPath, curCount + 1); } - path = ShardPath.selectNewPathForShard(nodeEnv, shardId, this.indexSettings, routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(), + path = ShardPath.selectNewPathForShard(nodeEnv, shardId, this.indexSettings, + routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE + ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(), dataPathToShardCount); logger.debug("{} creating using a new path [{}]", shardId, path); } else { @@ -323,11 +335,16 @@ public final class IndexService extends AbstractIndexComponent implements IndexC warmer.warm(searcher, shard, IndexService.this.indexSettings, toLevel); } }; - store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> eventListener.onStoreClosed(shardId))); + store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, + new StoreCloseListener(shardId, canDeleteShardContent, () -> eventListener.onStoreClosed(shardId))); if (useShadowEngine(primary, indexSettings)) { - indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, engineWarmer); // no indexing listeners - shadow engines don't index + indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, + indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, searchSlowLog, engineWarmer); + // no indexing listeners - shadow engines don't index } else { - indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, engineWarmer, listeners); + indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, + indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, searchSlowLog, engineWarmer, + listeners); } eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); @@ -372,7 +389,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC // and close the shard so no operations are allowed to it if (indexShard != null) { try { - final boolean flushEngine = deleted.get() == false && closed.get(); // only flush we are we closed (closed index or shutdown) and if we are not deleted + // only flush we are we closed (closed index or shutdown) and if we are not deleted + final boolean flushEngine = deleted.get() == false && closed.get(); indexShard.close(reason, flushEngine); } catch (Throwable e) { logger.debug("[{}] failed to close index shard", e, shardId); @@ -419,7 +437,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } /** - * Creates a new QueryShardContext. The context has not types set yet, if types are required set them via {@link QueryShardContext#setTypes(String...)} + * Creates a new QueryShardContext. The context has not types set yet, if types are required set them via + * {@link QueryShardContext#setTypes(String...)} */ public QueryShardContext newQueryShardContext() { return new QueryShardContext( @@ -429,8 +448,12 @@ public final class IndexService extends AbstractIndexComponent implements IndexC ); } - ThreadPool getThreadPool() { - return nodeServicesProvider.getThreadPool(); + public ThreadPool getThreadPool() { + return threadPool; + } + + public BigArrays getBigArrays() { + return bigArrays; } public SearchSlowLog getSearchSlowLog() { @@ -547,7 +570,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC AliasMetaData alias = aliases.get(aliasName); if (alias == null) { // This shouldn't happen unless alias disappeared after filteringAliases was called. - throw new InvalidAliasNameException(indexSettings.getIndex(), aliasNames[0], "Unknown alias name was passed to alias Filter"); + throw new InvalidAliasNameException(indexSettings.getIndex(), aliasNames[0], + "Unknown alias name was passed to alias Filter"); } Query parsedFilter = parse(alias, context); if (parsedFilter != null) { @@ -723,7 +747,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC } catch (Exception ex) { if (lastThrownException == null || sameException(lastThrownException, ex) == false) { // prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs - indexService.logger.warn("failed to run task {} - suppressing re-occurring exceptions unless the exception changes", ex, toString()); + indexService.logger.warn("failed to run task {} - suppressing re-occurring exceptions unless the exception changes", + ex, toString()); lastThrownException = ex; } } finally { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5d54a8c22c3..7b19a12bbaa 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -46,12 +46,12 @@ import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.SuspendableRefContainer; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.NodeServicesProvider; import org.elasticsearch.index.SearchSlowLog; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.cache.IndexCache; @@ -193,7 +193,7 @@ public class IndexShard extends AbstractIndexShardComponent { public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory, - IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, + IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, ThreadPool threadPool, BigArrays bigArrays, SearchSlowLog slowLog, Engine.Warmer warmer, IndexingOperationListener... listeners) { super(shardId, indexSettings); final Settings settings = indexSettings.getSettings(); @@ -205,7 +205,7 @@ public class IndexShard extends AbstractIndexShardComponent { this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory; this.store = store; this.indexEventListener = indexEventListener; - this.threadPool = provider.getThreadPool(); + this.threadPool = threadPool; this.mapperService = mapperService; this.indexCache = indexCache; this.internalIndexingStats = new InternalIndexingStats(); @@ -226,7 +226,7 @@ public class IndexShard extends AbstractIndexShardComponent { this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, - provider.getBigArrays()); + bigArrays); final QueryCachingPolicy cachingPolicy; // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 5518d1b1273..774052b3a5f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -20,8 +20,8 @@ package org.elasticsearch.index.shard; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.NodeServicesProvider; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; @@ -33,6 +33,7 @@ import org.elasticsearch.index.SearchSlowLog; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogStats; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -44,9 +45,13 @@ import java.io.IOException; */ public final class ShadowIndexShard extends IndexShard { - public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory, - IndexEventListener indexEventListener, IndexSearcherWrapper wrapper, NodeServicesProvider provider, SearchSlowLog searchSlowLog, Engine.Warmer engineWarmer) throws IOException { - super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, indexEventListener, wrapper, provider, searchSlowLog, engineWarmer); + public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, + MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, + @Nullable EngineFactory engineFactory, IndexEventListener indexEventListener, IndexSearcherWrapper wrapper, + ThreadPool threadPool, BigArrays bigArrays, SearchSlowLog searchSlowLog, Engine.Warmer engineWarmer) + throws IOException { + super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, + indexEventListener, wrapper, threadPool, bigArrays, searchSlowLog, engineWarmer); } /** diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index cf4a0e2e0aa..be3a351cbcc 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1158,11 +1158,10 @@ public class IndexShardTests extends ESSingleNodeTestCase { private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException { ShardRouting routing = new ShardRouting(shard.routingEntry()); shard.close("simon says", true); - NodeServicesProvider indexServices = indexService.getIndexServices(); IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, - indexServices, indexService.getSearchSlowLog(), null, listeners + indexService.getThreadPool(), indexService.getBigArrays(), indexService.getSearchSlowLog(), null, listeners ); ShardRoutingHelper.reinit(routing); newShard.updateRoutingEntry(routing, false); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index 8eeb96a94bf..73576c9a840 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -282,8 +282,8 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { * Create a new search context. */ protected SearchContext createSearchContext(IndexService indexService) { - BigArrays bigArrays = indexService.getIndexServices().getBigArrays(); - ThreadPool threadPool = indexService.getIndexServices().getThreadPool(); + BigArrays bigArrays = indexService.getBigArrays(); + ThreadPool threadPool = indexService.getThreadPool(); PageCacheRecycler pageCacheRecycler = node().injector().getInstance(PageCacheRecycler.class); ScriptService scriptService = node().injector().getInstance(ScriptService.class); return new TestSearchContext(threadPool, pageCacheRecycler, bigArrays, scriptService, indexService);