mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Detach IndexShard from node services
this is the last step to remove node level service from IndexShard. This means that tests can now more easily create an IndexShard instance without starting a node and removes the dependency between IndexShard and Client/ScriptService
This commit is contained in:
parent
8f22a01bbd
commit
33521fc27c
@ -435,7 +435,6 @@
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]AlreadyExpiredException.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]CompositeIndexEventListener.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexModule.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexService.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexSettings.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexingSlowLog.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]MergePolicyConfig.java" checks="LineLength" />
|
||||
@ -602,7 +601,6 @@
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]shard[/\\]IndexSearcherWrapper.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]shard[/\\]IndexShard.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]shard[/\\]IndexingStats.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]shard[/\\]ShadowIndexShard.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]shard[/\\]ShardPath.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]shard[/\\]ShardStateMetaData.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]shard[/\\]StoreRecovery.java" checks="LineLength" />
|
||||
|
@ -44,6 +44,7 @@ import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
@ -114,6 +115,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||
private volatile AsyncRefreshTask refreshTask;
|
||||
private volatile AsyncTranslogFSync fsyncTask;
|
||||
private final SearchSlowLog searchSlowLog;
|
||||
private final ThreadPool threadPool;
|
||||
private final BigArrays bigArrays;
|
||||
|
||||
public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
|
||||
SimilarityService similarityService,
|
||||
@ -132,9 +135,13 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||
this.indexSettings = indexSettings;
|
||||
this.analysisService = registry.build(indexSettings);
|
||||
this.similarityService = similarityService;
|
||||
this.mapperService = new MapperService(indexSettings, analysisService, similarityService, mapperRegistry, IndexService.this::newQueryShardContext);
|
||||
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, nodeServicesProvider.getCircuitBreakerService(), mapperService);
|
||||
this.mapperService = new MapperService(indexSettings, analysisService, similarityService, mapperRegistry,
|
||||
IndexService.this::newQueryShardContext);
|
||||
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache,
|
||||
nodeServicesProvider.getCircuitBreakerService(), mapperService);
|
||||
this.shardStoreDeleter = shardStoreDeleter;
|
||||
this.bigArrays = nodeServicesProvider.getBigArrays();
|
||||
this.threadPool = nodeServicesProvider.getThreadPool();
|
||||
this.eventListener = eventListener;
|
||||
this.nodeEnv = nodeEnv;
|
||||
this.nodeServicesProvider = nodeServicesProvider;
|
||||
@ -142,7 +149,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||
indexFieldData.setListener(new FieldDataCacheListener(this));
|
||||
this.bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetCacheListener(this));
|
||||
PercolatorQueryCache percolatorQueryCache = new PercolatorQueryCache(indexSettings, IndexService.this::newQueryShardContext);
|
||||
this.warmer = new IndexWarmer(indexSettings.getSettings(), nodeServicesProvider.getThreadPool(), bitsetFilterCache.createListener(nodeServicesProvider.getThreadPool()), percolatorQueryCache.createListener(nodeServicesProvider.getThreadPool()));
|
||||
this.warmer = new IndexWarmer(indexSettings.getSettings(), threadPool,
|
||||
bitsetFilterCache.createListener(threadPool),
|
||||
percolatorQueryCache.createListener(threadPool));
|
||||
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache, percolatorQueryCache);
|
||||
this.engineFactory = engineFactory;
|
||||
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
|
||||
@ -232,7 +241,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
IOUtils.close(bitsetFilterCache, indexCache, mapperService, indexFieldData, analysisService, refreshTask, fsyncTask, cache().getPercolatorQueryCache());
|
||||
IOUtils.close(bitsetFilterCache, indexCache, mapperService, indexFieldData, analysisService, refreshTask, fsyncTask,
|
||||
cache().getPercolatorQueryCache());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -302,7 +312,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||
}
|
||||
dataPathToShardCount.put(dataPath, curCount + 1);
|
||||
}
|
||||
path = ShardPath.selectNewPathForShard(nodeEnv, shardId, this.indexSettings, routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE ? getAvgShardSizeInBytes() : routing.getExpectedShardSize(),
|
||||
path = ShardPath.selectNewPathForShard(nodeEnv, shardId, this.indexSettings,
|
||||
routing.getExpectedShardSize() == ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE
|
||||
? getAvgShardSizeInBytes() : routing.getExpectedShardSize(),
|
||||
dataPathToShardCount);
|
||||
logger.debug("{} creating using a new path [{}]", shardId, path);
|
||||
} else {
|
||||
@ -323,11 +335,16 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||
warmer.warm(searcher, shard, IndexService.this.indexSettings, toLevel);
|
||||
}
|
||||
};
|
||||
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> eventListener.onStoreClosed(shardId)));
|
||||
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock,
|
||||
new StoreCloseListener(shardId, canDeleteShardContent, () -> eventListener.onStoreClosed(shardId)));
|
||||
if (useShadowEngine(primary, indexSettings)) {
|
||||
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, engineWarmer); // no indexing listeners - shadow engines don't index
|
||||
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService,
|
||||
indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, searchSlowLog, engineWarmer);
|
||||
// no indexing listeners - shadow engines don't index
|
||||
} else {
|
||||
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, engineWarmer, listeners);
|
||||
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService,
|
||||
indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, searchSlowLog, engineWarmer,
|
||||
listeners);
|
||||
}
|
||||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
||||
eventListener.afterIndexShardCreated(indexShard);
|
||||
@ -372,7 +389,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||
// and close the shard so no operations are allowed to it
|
||||
if (indexShard != null) {
|
||||
try {
|
||||
final boolean flushEngine = deleted.get() == false && closed.get(); // only flush we are we closed (closed index or shutdown) and if we are not deleted
|
||||
// only flush we are we closed (closed index or shutdown) and if we are not deleted
|
||||
final boolean flushEngine = deleted.get() == false && closed.get();
|
||||
indexShard.close(reason, flushEngine);
|
||||
} catch (Throwable e) {
|
||||
logger.debug("[{}] failed to close index shard", e, shardId);
|
||||
@ -419,7 +437,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new QueryShardContext. The context has not types set yet, if types are required set them via {@link QueryShardContext#setTypes(String...)}
|
||||
* Creates a new QueryShardContext. The context has not types set yet, if types are required set them via
|
||||
* {@link QueryShardContext#setTypes(String...)}
|
||||
*/
|
||||
public QueryShardContext newQueryShardContext() {
|
||||
return new QueryShardContext(
|
||||
@ -429,8 +448,12 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||
);
|
||||
}
|
||||
|
||||
ThreadPool getThreadPool() {
|
||||
return nodeServicesProvider.getThreadPool();
|
||||
public ThreadPool getThreadPool() {
|
||||
return threadPool;
|
||||
}
|
||||
|
||||
public BigArrays getBigArrays() {
|
||||
return bigArrays;
|
||||
}
|
||||
|
||||
public SearchSlowLog getSearchSlowLog() {
|
||||
@ -547,7 +570,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||
AliasMetaData alias = aliases.get(aliasName);
|
||||
if (alias == null) {
|
||||
// This shouldn't happen unless alias disappeared after filteringAliases was called.
|
||||
throw new InvalidAliasNameException(indexSettings.getIndex(), aliasNames[0], "Unknown alias name was passed to alias Filter");
|
||||
throw new InvalidAliasNameException(indexSettings.getIndex(), aliasNames[0],
|
||||
"Unknown alias name was passed to alias Filter");
|
||||
}
|
||||
Query parsedFilter = parse(alias, context);
|
||||
if (parsedFilter != null) {
|
||||
@ -723,7 +747,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
||||
} catch (Exception ex) {
|
||||
if (lastThrownException == null || sameException(lastThrownException, ex) == false) {
|
||||
// prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs
|
||||
indexService.logger.warn("failed to run task {} - suppressing re-occurring exceptions unless the exception changes", ex, toString());
|
||||
indexService.logger.warn("failed to run task {} - suppressing re-occurring exceptions unless the exception changes",
|
||||
ex, toString());
|
||||
lastThrownException = ex;
|
||||
}
|
||||
} finally {
|
||||
|
@ -46,12 +46,12 @@ import org.elasticsearch.common.metrics.MeanMetric;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.Callback;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.SuspendableRefContainer;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.NodeServicesProvider;
|
||||
import org.elasticsearch.index.SearchSlowLog;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.cache.IndexCache;
|
||||
@ -193,7 +193,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
|
||||
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
|
||||
@Nullable EngineFactory engineFactory,
|
||||
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider,
|
||||
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, ThreadPool threadPool, BigArrays bigArrays,
|
||||
SearchSlowLog slowLog, Engine.Warmer warmer, IndexingOperationListener... listeners) {
|
||||
super(shardId, indexSettings);
|
||||
final Settings settings = indexSettings.getSettings();
|
||||
@ -205,7 +205,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||
this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory;
|
||||
this.store = store;
|
||||
this.indexEventListener = indexEventListener;
|
||||
this.threadPool = provider.getThreadPool();
|
||||
this.threadPool = threadPool;
|
||||
this.mapperService = mapperService;
|
||||
this.indexCache = indexCache;
|
||||
this.internalIndexingStats = new InternalIndexingStats();
|
||||
@ -226,7 +226,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||
|
||||
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
|
||||
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings,
|
||||
provider.getBigArrays());
|
||||
bigArrays);
|
||||
final QueryCachingPolicy cachingPolicy;
|
||||
// the query cache is a node-level thing, however we want the most popular filters
|
||||
// to be computed on a per-shard basis
|
||||
|
@ -20,8 +20,8 @@ package org.elasticsearch.index.shard;
|
||||
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.NodeServicesProvider;
|
||||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
@ -33,6 +33,7 @@ import org.elasticsearch.index.SearchSlowLog;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.TranslogStats;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@ -44,9 +45,13 @@ import java.io.IOException;
|
||||
*/
|
||||
public final class ShadowIndexShard extends IndexShard {
|
||||
|
||||
public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory,
|
||||
IndexEventListener indexEventListener, IndexSearcherWrapper wrapper, NodeServicesProvider provider, SearchSlowLog searchSlowLog, Engine.Warmer engineWarmer) throws IOException {
|
||||
super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, indexEventListener, wrapper, provider, searchSlowLog, engineWarmer);
|
||||
public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
|
||||
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
|
||||
@Nullable EngineFactory engineFactory, IndexEventListener indexEventListener, IndexSearcherWrapper wrapper,
|
||||
ThreadPool threadPool, BigArrays bigArrays, SearchSlowLog searchSlowLog, Engine.Warmer engineWarmer)
|
||||
throws IOException {
|
||||
super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory,
|
||||
indexEventListener, wrapper, threadPool, bigArrays, searchSlowLog, engineWarmer);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1158,11 +1158,10 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
||||
private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper, IndexingOperationListener... listeners) throws IOException {
|
||||
ShardRouting routing = new ShardRouting(shard.routingEntry());
|
||||
shard.close("simon says", true);
|
||||
NodeServicesProvider indexServices = indexService.getIndexServices();
|
||||
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(),
|
||||
shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
|
||||
indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
|
||||
indexServices, indexService.getSearchSlowLog(), null, listeners
|
||||
indexService.getThreadPool(), indexService.getBigArrays(), indexService.getSearchSlowLog(), null, listeners
|
||||
);
|
||||
ShardRoutingHelper.reinit(routing);
|
||||
newShard.updateRoutingEntry(routing, false);
|
||||
|
@ -282,8 +282,8 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
||||
* Create a new search context.
|
||||
*/
|
||||
protected SearchContext createSearchContext(IndexService indexService) {
|
||||
BigArrays bigArrays = indexService.getIndexServices().getBigArrays();
|
||||
ThreadPool threadPool = indexService.getIndexServices().getThreadPool();
|
||||
BigArrays bigArrays = indexService.getBigArrays();
|
||||
ThreadPool threadPool = indexService.getThreadPool();
|
||||
PageCacheRecycler pageCacheRecycler = node().injector().getInstance(PageCacheRecycler.class);
|
||||
ScriptService scriptService = node().injector().getInstance(ScriptService.class);
|
||||
return new TestSearchContext(threadPool, pageCacheRecycler, bigArrays, scriptService, indexService);
|
||||
|
Loading…
x
Reference in New Issue
Block a user