more index level cleanups
This commit is contained in:
parent
d6b1f4ce6c
commit
e94f242456
|
@ -26,9 +26,7 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.Injector;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.env.ShardLock;
|
||||
|
@ -48,11 +46,8 @@ import org.elasticsearch.index.shard.*;
|
|||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.IndicesLifecycle;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.InternalIndicesLifecycle;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.plugins.PluginsService;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -71,69 +66,48 @@ import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
|
|||
*/
|
||||
public class IndexService extends AbstractIndexComponent implements IndexComponent, Iterable<IndexShard> {
|
||||
|
||||
private final Injector injector;
|
||||
|
||||
private final Settings indexSettings;
|
||||
|
||||
private final PluginsService pluginsService;
|
||||
|
||||
private final InternalIndicesLifecycle indicesLifecycle;
|
||||
|
||||
private final AnalysisService analysisService;
|
||||
|
||||
private final MapperService mapperService;
|
||||
|
||||
private final IndexQueryParserService queryParserService;
|
||||
|
||||
private final SimilarityService similarityService;
|
||||
|
||||
private final IndexAliasesService aliasesService;
|
||||
|
||||
private final IndexCache indexCache;
|
||||
|
||||
private final IndexFieldDataService indexFieldData;
|
||||
|
||||
private final BitsetFilterCache bitsetFilterCache;
|
||||
|
||||
private final IndexSettingsService settingsService;
|
||||
|
||||
private final NodeEnvironment nodeEnv;
|
||||
private final IndicesService indicesServices;
|
||||
private final IndexServicesProvider indexServicesProvider;
|
||||
private final IndexStore indexStore;
|
||||
|
||||
private volatile ImmutableMap<Integer, IndexShard> shards = ImmutableMap.of();
|
||||
|
||||
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
private final AtomicBoolean deleted = new AtomicBoolean(false);
|
||||
|
||||
@Inject
|
||||
public IndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv,
|
||||
AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService,
|
||||
SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache,
|
||||
public IndexService(Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv,
|
||||
AnalysisService analysisService,
|
||||
IndexSettingsService settingsService,
|
||||
IndexFieldDataService indexFieldData, BitsetFilterCache bitSetFilterCache, IndicesService indicesServices) {
|
||||
|
||||
IndexFieldDataService indexFieldData,
|
||||
BitsetFilterCache bitSetFilterCache,
|
||||
IndicesService indicesServices,
|
||||
IndexServicesProvider indexServicesProvider,
|
||||
IndexStore indexStore) {
|
||||
super(index, indexSettings);
|
||||
this.injector = injector;
|
||||
this.indexSettings = indexSettings;
|
||||
this.analysisService = analysisService;
|
||||
this.mapperService = mapperService;
|
||||
this.queryParserService = queryParserService;
|
||||
this.similarityService = similarityService;
|
||||
this.aliasesService = aliasesService;
|
||||
this.indexCache = indexCache;
|
||||
this.indexFieldData = indexFieldData;
|
||||
this.settingsService = settingsService;
|
||||
this.bitsetFilterCache = bitSetFilterCache;
|
||||
|
||||
this.pluginsService = injector.getInstance(PluginsService.class);
|
||||
this.indicesServices = indicesServices;
|
||||
this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class);
|
||||
|
||||
// inject workarounds for cyclic dep
|
||||
this.indicesLifecycle = (InternalIndicesLifecycle) indexServicesProvider.getIndicesLifecycle();
|
||||
this.nodeEnv = nodeEnv;
|
||||
this.indexServicesProvider = indexServicesProvider;
|
||||
this.indexStore = indexStore;
|
||||
indexFieldData.setListener(new FieldDataCacheListener(this));
|
||||
bitSetFilterCache.setListener(new BitsetCacheListener(this));
|
||||
this.nodeEnv = nodeEnv;
|
||||
}
|
||||
|
||||
public int numberOfShards() {
|
||||
|
@ -176,16 +150,12 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
return shards.keySet();
|
||||
}
|
||||
|
||||
public Injector injector() {
|
||||
return injector;
|
||||
}
|
||||
|
||||
public IndexSettingsService settingsService() {
|
||||
return this.settingsService;
|
||||
}
|
||||
|
||||
public IndexCache cache() {
|
||||
return indexCache;
|
||||
return indexServicesProvider.getIndexCache();
|
||||
}
|
||||
|
||||
public IndexFieldDataService fieldData() {
|
||||
|
@ -201,19 +171,19 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
}
|
||||
|
||||
public MapperService mapperService() {
|
||||
return mapperService;
|
||||
return indexServicesProvider.getMapperService();
|
||||
}
|
||||
|
||||
public IndexQueryParserService queryParserService() {
|
||||
return queryParserService;
|
||||
return indexServicesProvider.getQueryParserService();
|
||||
}
|
||||
|
||||
public SimilarityService similarityService() {
|
||||
return similarityService;
|
||||
return indexServicesProvider.getSimilarityService();
|
||||
}
|
||||
|
||||
public IndexAliasesService aliasesService() {
|
||||
return aliasesService;
|
||||
return indexServicesProvider.getIndexAliasesService();
|
||||
}
|
||||
|
||||
public synchronized void close(final String reason, boolean delete) {
|
||||
|
@ -288,7 +258,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
if (path == null) {
|
||||
// TODO: we should, instead, hold a "bytes reserved" of how large we anticipate this shard will be, e.g. for a shard
|
||||
// that's being relocated/replicated we know how large it will become once it's done copying:
|
||||
|
||||
// Count up how many shards are currently on each data path:
|
||||
Map<Path,Integer> dataPathToShardCount = new HashMap<>();
|
||||
for(IndexShard shard : this) {
|
||||
|
@ -314,12 +283,11 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
// if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary.
|
||||
final boolean canDeleteShardContent = IndexMetaData.isOnSharedFilesystem(indexSettings) == false ||
|
||||
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
|
||||
IndexStore indexStore = injector.getInstance(IndexStore.class);
|
||||
store = new Store(shardId, indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> injector.getInstance(IndicesQueryCache.class).onClose(shardId)));
|
||||
store = new Store(shardId, indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> indexServicesProvider.getIndicesQueryCache().onClose(shardId)));
|
||||
if (useShadowEngine(primary, indexSettings)) {
|
||||
indexShard = new ShadowIndexShard(shardId, indexSettings, path, store, injector.getInstance(IndexServicesProvider.class));
|
||||
indexShard = new ShadowIndexShard(shardId, indexSettings, path, store, indexServicesProvider);
|
||||
} else {
|
||||
indexShard = new IndexShard(shardId, indexSettings, path, store, injector.getInstance(IndexServicesProvider.class));
|
||||
indexShard = new IndexShard(shardId, indexSettings, path, store, indexServicesProvider);
|
||||
}
|
||||
|
||||
indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created");
|
||||
|
@ -407,6 +375,10 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
|
|||
}
|
||||
}
|
||||
|
||||
public IndexServicesProvider getIndexServices() {
|
||||
return indexServicesProvider;
|
||||
}
|
||||
|
||||
private class StoreCloseListener implements Store.OnClose {
|
||||
private final ShardId shardId;
|
||||
private final boolean ownsShard;
|
||||
|
|
|
@ -57,9 +57,6 @@ public final class IndexServicesProvider {
|
|||
private final SimilarityService similarityService;
|
||||
private final EngineFactory factory;
|
||||
private final BigArrays bigArrays;
|
||||
|
||||
|
||||
|
||||
private final IndexSearcherWrapper indexSearcherWrapper;
|
||||
|
||||
@Inject
|
||||
|
|
|
@ -767,7 +767,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||
|
||||
|
||||
public IndexShard postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
|
||||
indicesLifecycle.beforeIndexShardPostRecovery(this);
|
||||
if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
|
||||
refresh("percolator_load_queries");
|
||||
try (Engine.Searcher searcher = engine().acquireSearcher("percolator_load_queries")) {
|
||||
|
@ -787,7 +786,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||
recoveryState.setStage(RecoveryState.Stage.DONE);
|
||||
changeState(IndexShardState.POST_RECOVERY, reason);
|
||||
}
|
||||
indicesLifecycle.afterIndexShardPostRecovery(this);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -97,17 +97,6 @@ public interface IndicesLifecycle {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Called right after the shard is moved into POST_RECOVERY mode
|
||||
*/
|
||||
public void afterIndexShardPostRecovery(IndexShard indexShard) {}
|
||||
|
||||
/**
|
||||
* Called right before the shard is moved into POST_RECOVERY mode.
|
||||
* The shard is ready to be used but not yet marked as POST_RECOVERY.
|
||||
*/
|
||||
public void beforeIndexShardPostRecovery(IndexShard indexShard) {}
|
||||
|
||||
/**
|
||||
* Called after the index shard has been started.
|
||||
*/
|
||||
|
|
|
@ -121,28 +121,6 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
|
|||
}
|
||||
}
|
||||
|
||||
public void beforeIndexShardPostRecovery(IndexShard indexShard) {
|
||||
for (Listener listener : listeners) {
|
||||
try {
|
||||
listener.beforeIndexShardPostRecovery(indexShard);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("{} failed to invoke before shard post recovery callback", t, indexShard.shardId());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void afterIndexShardPostRecovery(IndexShard indexShard) {
|
||||
for (Listener listener : listeners) {
|
||||
try {
|
||||
listener.afterIndexShardPostRecovery(indexShard);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("{} failed to invoke after shard post recovery callback", t, indexShard.shardId());
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void afterIndexShardStarted(IndexShard indexShard) {
|
||||
for (Listener listener : listeners) {
|
||||
|
|
|
@ -327,7 +327,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
// already deleted on us, ignore it
|
||||
continue;
|
||||
}
|
||||
IndexSettingsService indexSettingsService = indexService.injector().getInstance(IndexSettingsService.class);
|
||||
IndexSettingsService indexSettingsService = indexService.settingsService();
|
||||
indexSettingsService.refreshSettings(indexMetaData.settings());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -97,7 +97,7 @@ public class CodecTests extends ESSingleNodeTestCase {
|
|||
|
||||
private static CodecService createCodecService(Settings settings) {
|
||||
IndexService indexService = createIndex("test", settings);
|
||||
return indexService.injector().getInstance(CodecService.class);
|
||||
return indexService.getIndexServices().getCodecService();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -102,7 +102,7 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase {
|
|||
Settings settings = Settings.builder().put("index.fielddata.cache", "none").build();
|
||||
indexService = createIndex("test", settings);
|
||||
mapperService = indexService.mapperService();
|
||||
indicesFieldDataCache = indexService.injector().getInstance(IndicesFieldDataCache.class);
|
||||
indicesFieldDataCache = getInstanceFromNode(IndicesFieldDataCache.class);
|
||||
ifdService = indexService.fieldData();
|
||||
// LogByteSizeMP to preserve doc ID order
|
||||
writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(new LogByteSizeMergePolicy()));
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.elasticsearch.indices.IndicesService;
|
|||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.NodeBuilder;
|
||||
import org.elasticsearch.node.internal.InternalSettingsPreparer;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
|
@ -219,10 +220,11 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
|||
* Create a new search context.
|
||||
*/
|
||||
protected static SearchContext createSearchContext(IndexService indexService) {
|
||||
BigArrays bigArrays = indexService.injector().getInstance(BigArrays.class);
|
||||
ThreadPool threadPool = indexService.injector().getInstance(ThreadPool.class);
|
||||
PageCacheRecycler pageCacheRecycler = indexService.injector().getInstance(PageCacheRecycler.class);
|
||||
return new TestSearchContext(threadPool, pageCacheRecycler, bigArrays, indexService);
|
||||
BigArrays bigArrays = indexService.getIndexServices().getBigArrays();
|
||||
ThreadPool threadPool = indexService.getIndexServices().getThreadPool();
|
||||
PageCacheRecycler pageCacheRecycler = node().injector().getInstance(PageCacheRecycler.class);
|
||||
ScriptService scriptService = node().injector().getInstance(ScriptService.class);
|
||||
return new TestSearchContext(threadPool, pageCacheRecycler, bigArrays, scriptService, indexService);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1739,7 +1739,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
IndexService indexService = indicesService.indexService(index);
|
||||
if (indexService != null) {
|
||||
assertThat(indexService.settingsService().getSettings().getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, -1), greaterThan(shard));
|
||||
OperationRouting operationRouting = indexService.injector().getInstance(OperationRouting.class);
|
||||
OperationRouting operationRouting = getInstanceFromNode(OperationRouting.class, node);
|
||||
while (true) {
|
||||
String routing = RandomStrings.randomAsciiOfLength(random, 10);
|
||||
final int targetShard = operationRouting.indexShards(clusterService.state(), index, type, null, routing).shardId().getId();
|
||||
|
|
|
@ -85,6 +85,7 @@ public class TestSearchContext extends SearchContext {
|
|||
final IndexShard indexShard;
|
||||
final Counter timeEstimateCounter = Counter.newCounter();
|
||||
final QuerySearchResult queryResult = new QuerySearchResult();
|
||||
ScriptService scriptService;
|
||||
ParsedQuery originalQuery;
|
||||
ParsedQuery postFilter;
|
||||
Query query;
|
||||
|
@ -99,7 +100,7 @@ public class TestSearchContext extends SearchContext {
|
|||
private final long originNanoTime = System.nanoTime();
|
||||
private final Map<String, FetchSubPhaseContext> subPhaseContexts = new HashMap<>();
|
||||
|
||||
public TestSearchContext(ThreadPool threadPool,PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, IndexService indexService) {
|
||||
public TestSearchContext(ThreadPool threadPool,PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, ScriptService scriptService, IndexService indexService) {
|
||||
super(ParseFieldMatcher.STRICT, null);
|
||||
this.pageCacheRecycler = pageCacheRecycler;
|
||||
this.bigArrays = bigArrays.withCircuitBreaking();
|
||||
|
@ -108,6 +109,7 @@ public class TestSearchContext extends SearchContext {
|
|||
this.fixedBitSetFilterCache = indexService.bitsetFilterCache();
|
||||
this.threadPool = threadPool;
|
||||
this.indexShard = indexService.shard(0);
|
||||
this.scriptService = scriptService;
|
||||
}
|
||||
|
||||
public TestSearchContext() {
|
||||
|
@ -119,6 +121,7 @@ public class TestSearchContext extends SearchContext {
|
|||
this.threadPool = null;
|
||||
this.fixedBitSetFilterCache = null;
|
||||
this.indexShard = null;
|
||||
scriptService = null;
|
||||
}
|
||||
|
||||
public void setTypes(String... types) {
|
||||
|
@ -325,7 +328,7 @@ public class TestSearchContext extends SearchContext {
|
|||
|
||||
@Override
|
||||
public ScriptService scriptService() {
|
||||
return indexService.injector().getInstance(ScriptService.class);
|
||||
return scriptService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue