No mapper service and index caches for replicated closed indices (#40423)

Replicated closed indices can't be indexed into or searched, and therefore don't need a shard with
full indexing and search capabilities allocated. We can save on a lot of heap memory for those
indices by not allocating a mapper service and caching infrastructure (which preallocates a constant
amount per instance). Before this change, a 1GB ES instance could host 250 replicated closed
metricbeat indices (each index with one shard). After this change, the same instance can host 7300
replicated closed metricbeat instances (not that this would be a recommended configuration). Most
of the remaining memory is in the cluster state and the IndexSettings object.
This commit is contained in:
Yannick Welsch 2019-03-27 18:49:03 +01:00
parent 8f7c5732f1
commit 64b31f44af
7 changed files with 58 additions and 30 deletions

View File

@ -366,6 +366,7 @@ public final class IndexModule {
}
public IndexService newIndexService(
IndexService.IndexCreationContext indexCreationContext,
NodeEnvironment environment,
NamedXContentRegistry xContentRegistry,
IndexService.ShardStoreDeleter shardStoreDeleter,
@ -395,7 +396,7 @@ public final class IndexModule {
} else {
queryCache = new DisabledQueryCache(indexSettings);
}
return new IndexService(indexSettings, environment, xContentRegistry,
return new IndexService(indexSettings, indexCreationContext, environment, xContentRegistry,
new SimilarityService(indexSettings, scriptService, similarities),
shardStoreDeleter, analysisRegistry, engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService,
client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry,

View File

@ -136,6 +136,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
public IndexService(
IndexSettings indexSettings,
IndexCreationContext indexCreationContext,
NodeEnvironment nodeEnv,
NamedXContentRegistry xContentRegistry,
SimilarityService similarityService,
@ -162,21 +163,36 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
this.similarityService = similarityService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.circuitBreakerService = circuitBreakerService;
this.mapperService = new MapperService(indexSettings, registry.build(indexSettings), xContentRegistry, similarityService,
mapperRegistry,
// we parse all percolator queries as they would be parsed on shard 0
() -> newQueryShardContext(0, null, System::currentTimeMillis, null));
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService, mapperService);
if (indexSettings.getIndexSortConfig().hasIndexSort()) {
// we delay the actual creation of the sort order for this index because the mapping has not been merged yet.
// The sort order is validated right after the merge of the mapping later in the process.
this.indexSortSupplier = () -> indexSettings.getIndexSortConfig().buildIndexSort(
mapperService::fullName,
indexFieldData::getForField
);
} else {
if (indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE &&
indexCreationContext == IndexCreationContext.CREATE_INDEX) { // metadata verification needs a mapper service
this.mapperService = null;
this.indexFieldData = null;
this.indexSortSupplier = () -> null;
this.bitsetFilterCache = null;
this.warmer = null;
this.indexCache = null;
} else {
this.mapperService = new MapperService(indexSettings, registry.build(indexSettings), xContentRegistry, similarityService,
mapperRegistry,
// we parse all percolator queries as they would be parsed on shard 0
() -> newQueryShardContext(0, null, System::currentTimeMillis, null));
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService, mapperService);
if (indexSettings.getIndexSortConfig().hasIndexSort()) {
// we delay the actual creation of the sort order for this index because the mapping has not been merged yet.
// The sort order is validated right after the merge of the mapping later in the process.
this.indexSortSupplier = () -> indexSettings.getIndexSortConfig().buildIndexSort(
mapperService::fullName,
indexFieldData::getForField
);
} else {
this.indexSortSupplier = () -> null;
}
indexFieldData.setListener(new FieldDataCacheListener(this));
this.bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetCacheListener(this));
this.warmer = new IndexWarmer(threadPool, indexFieldData, bitsetFilterCache.createListener(threadPool));
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
}
this.shardStoreDeleter = shardStoreDeleter;
this.bigArrays = bigArrays;
this.threadPool = threadPool;
@ -185,10 +201,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
this.eventListener = eventListener;
this.nodeEnv = nodeEnv;
this.indexStore = indexStore;
indexFieldData.setListener(new FieldDataCacheListener(this));
this.bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetCacheListener(this));
this.warmer = new IndexWarmer(threadPool, indexFieldData, bitsetFilterCache.createListener(threadPool));
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
this.engineFactory = Objects.requireNonNull(engineFactory);
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
@ -202,6 +214,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
updateFsyncTaskIfNecessary();
}
public enum IndexCreationContext {
CREATE_INDEX,
META_DATA_VERIFICATION
}
public int numberOfShards() {
return shards.size();
}
@ -548,7 +565,10 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
@Override
public boolean updateMapping(final IndexMetaData currentIndexMetaData, final IndexMetaData newIndexMetaData) throws IOException {
return mapperService().updateMapping(currentIndexMetaData, newIndexMetaData);
if (mapperService == null) {
return false;
}
return mapperService.updateMapping(currentIndexMetaData, newIndexMetaData);
}
private class StoreCloseListener implements Store.OnClose {

View File

@ -2493,8 +2493,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache.query(), cachingPolicy, translogConfig,
mapperService != null ? mapperService.indexAnalyzer() : null,
similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache != null ? indexCache.query() : null, cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
@ -3077,7 +3078,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private EngineConfig.TombstoneDocSupplier tombstoneDocSupplier() {
final RootObjectMapper.Builder noopRootMapper = new RootObjectMapper.Builder("__noop");
final DocumentMapper noopDocumentMapper = new DocumentMapper.Builder(noopRootMapper, mapperService).build(mapperService);
final DocumentMapper noopDocumentMapper = mapperService != null ?
new DocumentMapper.Builder(noopRootMapper, mapperService).build(mapperService) :
null;
return new EngineConfig.TombstoneDocSupplier() {
@Override
public ParsedDocument newDeleteTombstoneDoc(String type, String id) {

View File

@ -157,6 +157,8 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList;
import static org.elasticsearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
import static org.elasticsearch.index.IndexService.IndexCreationContext.META_DATA_VERIFICATION;
import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
public class IndicesService extends AbstractLifecycleComponent
@ -492,7 +494,7 @@ public class IndicesService extends AbstractLifecycleComponent
finalListeners.add(oldShardsStats);
final IndexService indexService =
createIndexService(
"create index",
CREATE_INDEX,
indexMetaData,
indicesQueryCache,
indicesFieldDataCache,
@ -514,7 +516,7 @@ public class IndicesService extends AbstractLifecycleComponent
/**
* This creates a new IndexService without registering it
*/
private synchronized IndexService createIndexService(final String reason,
private synchronized IndexService createIndexService(IndexService.IndexCreationContext indexCreationContext,
IndexMetaData indexMetaData,
IndicesQueryCache indicesQueryCache,
IndicesFieldDataCache indicesFieldDataCache,
@ -532,7 +534,7 @@ public class IndicesService extends AbstractLifecycleComponent
indexMetaData.getIndex(),
idxSettings.getNumberOfShards(),
idxSettings.getNumberOfReplicas(),
reason);
indexCreationContext);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings), indexStoreFactories);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
@ -543,6 +545,7 @@ public class IndicesService extends AbstractLifecycleComponent
indexModule.addIndexEventListener(listener);
}
return indexModule.newIndexService(
indexCreationContext,
nodeEnv,
xContentRegistry,
this,
@ -621,7 +624,7 @@ public class IndicesService extends AbstractLifecycleComponent
closeables.add(indicesQueryCache);
// this will also fail if some plugin fails etc. which is nice since we can verify that early
final IndexService service =
createIndexService("metadata verification", metaData, indicesQueryCache, indicesFieldDataCache, emptyList());
createIndexService(META_DATA_VERIFICATION, metaData, indicesQueryCache, indicesFieldDataCache, emptyList());
closeables.add(() -> service.close("metadata verification", false));
service.mapperService().merge(metaData, MapperService.MergeReason.MAPPING_RECOVERY);
if (metaData.equals(metaDataUpdate) == false) {

View File

@ -89,6 +89,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
@ -148,8 +149,8 @@ public class IndexModuleTests extends ESTestCase {
}
private IndexService newIndexService(IndexModule module) throws IOException {
return module.newIndexService(nodeEnvironment, xContentRegistry(), deleter, circuitBreakerService, bigArrays, threadPool,
scriptService, null, indicesQueryCache, mapperRegistry,
return module.newIndexService(CREATE_INDEX, nodeEnvironment, xContentRegistry(), deleter, circuitBreakerService, bigArrays,
threadPool, scriptService, null, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, listener), writableRegistry());
}

View File

@ -1052,7 +1052,7 @@ public abstract class EngineTestCase extends ESTestCase {
* Asserts the provided engine has a consistent document history between translog and Lucene index.
*/
public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper) throws IOException {
if (mapper.documentMapper() == null || engine.config().getIndexSettings().isSoftDeleteEnabled() == false
if (mapper == null || mapper.documentMapper() == null || engine.config().getIndexSettings().isSoftDeleteEnabled() == false
|| (engine instanceof InternalEngine) == false) {
return;
}

View File

@ -687,7 +687,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
throw new IllegalArgumentException("permission filters are not allowed to use the current timestamp");
}, null),
indexService.cache().bitsetFilterCache(),
indexService.cache() != null ? indexService.cache().bitsetFilterCache() : null,
indexService.getThreadPool().getThreadContext(), getLicenseState(),
indexService.getScriptService()));
/* We need to forcefully overwrite the query cache implementation to use security's opt out query cache implementation.