Close query cache on index service creation failure (#48230)
Today it is possible that we create the `QueryCache` and then fail to create the owning `IndexService` and this means we do not close the `QueryCache` again. This commit addresses that leak. Fixes #48186
This commit is contained in:
parent
c903ac2376
commit
3a6fa0bbdb
|
@ -39,8 +39,10 @@ import org.elasticsearch.common.settings.Setting.Property;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||
import org.elasticsearch.index.analysis.IndexAnalyzers;
|
||||
import org.elasticsearch.index.cache.query.DisabledQueryCache;
|
||||
import org.elasticsearch.index.cache.query.IndexQueryCache;
|
||||
import org.elasticsearch.index.cache.query.QueryCache;
|
||||
|
@ -399,22 +401,35 @@ public final class IndexModule {
|
|||
indexReaderWrapper.get() == null ? (shard) -> null : indexReaderWrapper.get();
|
||||
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
|
||||
final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories);
|
||||
final QueryCache queryCache;
|
||||
if (indexSettings.getValue(INDEX_QUERY_CACHE_ENABLED_SETTING)) {
|
||||
BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = forceQueryCacheProvider.get();
|
||||
if (queryCacheProvider == null) {
|
||||
queryCache = new IndexQueryCache(indexSettings, indicesQueryCache);
|
||||
QueryCache queryCache = null;
|
||||
IndexAnalyzers indexAnalyzers = null;
|
||||
boolean success = false;
|
||||
try {
|
||||
if (indexSettings.getValue(INDEX_QUERY_CACHE_ENABLED_SETTING)) {
|
||||
BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = forceQueryCacheProvider.get();
|
||||
if (queryCacheProvider == null) {
|
||||
queryCache = new IndexQueryCache(indexSettings, indicesQueryCache);
|
||||
} else {
|
||||
queryCache = queryCacheProvider.apply(indexSettings, indicesQueryCache);
|
||||
}
|
||||
} else {
|
||||
queryCache = queryCacheProvider.apply(indexSettings, indicesQueryCache);
|
||||
queryCache = new DisabledQueryCache(indexSettings);
|
||||
}
|
||||
if (IndexService.needsMapperService(indexSettings, indexCreationContext)) {
|
||||
indexAnalyzers = analysisRegistry.build(indexSettings);
|
||||
}
|
||||
final IndexService indexService = new IndexService(indexSettings, indexCreationContext, environment, xContentRegistry,
|
||||
new SimilarityService(indexSettings, scriptService, similarities), shardStoreDeleter, indexAnalyzers,
|
||||
engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache,
|
||||
directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners,
|
||||
indexOperationListeners, namedWriteableRegistry);
|
||||
success = true;
|
||||
return indexService;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
IOUtils.closeWhileHandlingException(queryCache, indexAnalyzers);
|
||||
}
|
||||
} else {
|
||||
queryCache = new DisabledQueryCache(indexSettings);
|
||||
}
|
||||
return new IndexService(indexSettings, indexCreationContext, environment, xContentRegistry,
|
||||
new SimilarityService(indexSettings, scriptService, similarities),
|
||||
shardStoreDeleter, analysisRegistry, engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService,
|
||||
clusterService, client, queryCache, directoryFactory, eventListener, readerWrapperFactory, mapperRegistry,
|
||||
indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry);
|
||||
}
|
||||
|
||||
private static IndexStorePlugin.DirectoryFactory getDirectoryFactory(
|
||||
|
|
|
@ -48,7 +48,6 @@ import org.elasticsearch.core.internal.io.IOUtils;
|
|||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.env.ShardLock;
|
||||
import org.elasticsearch.env.ShardLockObtainFailedException;
|
||||
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||
import org.elasticsearch.index.analysis.IndexAnalyzers;
|
||||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||
|
@ -148,8 +147,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
|||
NamedXContentRegistry xContentRegistry,
|
||||
SimilarityService similarityService,
|
||||
ShardStoreDeleter shardStoreDeleter,
|
||||
AnalysisRegistry registry,
|
||||
EngineFactory engineFactory,
|
||||
IndexAnalyzers indexAnalyzers, EngineFactory engineFactory,
|
||||
CircuitBreakerService circuitBreakerService,
|
||||
BigArrays bigArrays,
|
||||
ThreadPool threadPool,
|
||||
|
@ -164,24 +162,16 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
|||
IndicesFieldDataCache indicesFieldDataCache,
|
||||
List<SearchOperationListener> searchOperationListeners,
|
||||
List<IndexingOperationListener> indexingOperationListeners,
|
||||
NamedWriteableRegistry namedWriteableRegistry) throws IOException {
|
||||
NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(indexSettings);
|
||||
this.indexSettings = indexSettings;
|
||||
this.xContentRegistry = xContentRegistry;
|
||||
this.similarityService = similarityService;
|
||||
this.namedWriteableRegistry = namedWriteableRegistry;
|
||||
this.circuitBreakerService = circuitBreakerService;
|
||||
if (indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE &&
|
||||
indexCreationContext == IndexCreationContext.CREATE_INDEX) { // metadata verification needs a mapper service
|
||||
this.mapperService = null;
|
||||
this.indexFieldData = null;
|
||||
this.indexSortSupplier = () -> null;
|
||||
this.bitsetFilterCache = null;
|
||||
this.warmer = null;
|
||||
this.indexCache = null;
|
||||
} else {
|
||||
this.mapperService = new MapperService(indexSettings, registry.build(indexSettings), xContentRegistry, similarityService,
|
||||
mapperRegistry,
|
||||
if (needsMapperService(indexSettings, indexCreationContext)) {
|
||||
assert indexAnalyzers != null;
|
||||
this.mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,
|
||||
// we parse all percolator queries as they would be parsed on shard 0
|
||||
() -> newQueryShardContext(0, null, System::currentTimeMillis, null));
|
||||
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService, mapperService);
|
||||
|
@ -199,6 +189,14 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
|||
this.bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetCacheListener(this));
|
||||
this.warmer = new IndexWarmer(threadPool, indexFieldData, bitsetFilterCache.createListener(threadPool));
|
||||
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
|
||||
} else {
|
||||
assert indexAnalyzers == null;
|
||||
this.mapperService = null;
|
||||
this.indexFieldData = null;
|
||||
this.indexSortSupplier = () -> null;
|
||||
this.bitsetFilterCache = null;
|
||||
this.warmer = null;
|
||||
this.indexCache = null;
|
||||
}
|
||||
|
||||
this.shardStoreDeleter = shardStoreDeleter;
|
||||
|
@ -223,6 +221,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
|||
updateFsyncTaskIfNecessary();
|
||||
}
|
||||
|
||||
static boolean needsMapperService(IndexSettings indexSettings, IndexCreationContext indexCreationContext) {
|
||||
return false == (indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE &&
|
||||
indexCreationContext == IndexCreationContext.CREATE_INDEX); // metadata verification needs a mapper service
|
||||
}
|
||||
|
||||
public enum IndexCreationContext {
|
||||
CREATE_INDEX,
|
||||
META_DATA_VERIFICATION
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.elasticsearch.index;
|
||||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.index.AssertingDirectoryReader;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.FieldInvertState;
|
||||
|
@ -40,12 +41,15 @@ import org.elasticsearch.common.settings.Setting.Property;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.env.ShardLock;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||
import org.elasticsearch.index.analysis.AnalyzerProvider;
|
||||
import org.elasticsearch.index.analysis.AnalyzerScope;
|
||||
import org.elasticsearch.index.cache.query.DisabledQueryCache;
|
||||
import org.elasticsearch.index.cache.query.IndexQueryCache;
|
||||
import org.elasticsearch.index.cache.query.QueryCache;
|
||||
|
@ -65,6 +69,7 @@ import org.elasticsearch.index.similarity.SimilarityService;
|
|||
import org.elasticsearch.index.store.FsDirectoryFactory;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.analysis.AnalysisModule;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
|
||||
|
@ -84,13 +89,17 @@ import org.hamcrest.Matchers;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.hasToString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
|
@ -174,7 +183,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "foo_store")
|
||||
.build();
|
||||
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
|
||||
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = Collections.singletonMap(
|
||||
final Map<String, IndexStorePlugin.DirectoryFactory> indexStoreFactories = singletonMap(
|
||||
"foo_store", new FooFunction());
|
||||
final IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), indexStoreFactories);
|
||||
|
||||
|
@ -354,11 +363,19 @@ public class IndexModuleTests extends ESTestCase {
|
|||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
|
||||
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap());
|
||||
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
|
||||
expectThrows(AlreadySetException.class, () -> module.forceQueryCacheProvider((a, b) -> new CustomQueryCache()));
|
||||
final Set<CustomQueryCache> liveQueryCaches = new HashSet<>();
|
||||
module.forceQueryCacheProvider((a, b) -> {
|
||||
final CustomQueryCache customQueryCache = new CustomQueryCache(liveQueryCaches);
|
||||
liveQueryCaches.add(customQueryCache);
|
||||
return customQueryCache;
|
||||
});
|
||||
expectThrows(AlreadySetException.class, () -> module.forceQueryCacheProvider((a, b) -> {
|
||||
throw new AssertionError("never called");
|
||||
}));
|
||||
IndexService indexService = newIndexService(module);
|
||||
assertTrue(indexService.cache().query() instanceof CustomQueryCache);
|
||||
indexService.close("simon says", false);
|
||||
assertThat(liveQueryCaches, empty());
|
||||
}
|
||||
|
||||
public void testDefaultQueryCacheImplIsSelected() throws IOException {
|
||||
|
@ -379,12 +396,73 @@ public class IndexModuleTests extends ESTestCase {
|
|||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
|
||||
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap());
|
||||
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache());
|
||||
module.forceQueryCacheProvider((a, b) -> new CustomQueryCache(null));
|
||||
IndexService indexService = newIndexService(module);
|
||||
assertTrue(indexService.cache().query() instanceof DisabledQueryCache);
|
||||
indexService.close("simon says", false);
|
||||
}
|
||||
|
||||
public void testCustomQueryCacheCleanedUpIfIndexServiceCreationFails() {
|
||||
Settings settings = Settings.builder()
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
|
||||
IndexModule module = new IndexModule(indexSettings, emptyAnalysisRegistry, new InternalEngineFactory(), Collections.emptyMap());
|
||||
final Set<CustomQueryCache> liveQueryCaches = new HashSet<>();
|
||||
module.forceQueryCacheProvider((a, b) -> {
|
||||
final CustomQueryCache customQueryCache = new CustomQueryCache(liveQueryCaches);
|
||||
liveQueryCaches.add(customQueryCache);
|
||||
return customQueryCache;
|
||||
});
|
||||
threadPool.shutdown(); // causes index service creation to fail
|
||||
expectThrows(EsRejectedExecutionException.class, () -> newIndexService(module));
|
||||
assertThat(liveQueryCaches, empty());
|
||||
}
|
||||
|
||||
public void testIndexAnalyzersCleanedUpIfIndexServiceCreationFails() {
|
||||
Settings settings = Settings.builder()
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
|
||||
|
||||
final HashSet<Analyzer> openAnalyzers = new HashSet<>();
|
||||
final AnalysisModule.AnalysisProvider<AnalyzerProvider<?>> analysisProvider = (i,e,n,s) -> new AnalyzerProvider<>() {
|
||||
@Override
|
||||
public String name() {
|
||||
return "test";
|
||||
}
|
||||
|
||||
@Override
|
||||
public AnalyzerScope scope() {
|
||||
return AnalyzerScope.INDEX;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Analyzer get() {
|
||||
final Analyzer analyzer = new Analyzer() {
|
||||
@Override
|
||||
protected TokenStreamComponents createComponents(String fieldName) {
|
||||
throw new AssertionError("should not be here");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
super.close();
|
||||
openAnalyzers.remove(this);
|
||||
}
|
||||
};
|
||||
openAnalyzers.add(analyzer);
|
||||
return analyzer;
|
||||
}
|
||||
};
|
||||
final AnalysisRegistry analysisRegistry = new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(),
|
||||
singletonMap("test", analysisProvider), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap());
|
||||
IndexModule module = new IndexModule(indexSettings, analysisRegistry, new InternalEngineFactory(), Collections.emptyMap());
|
||||
threadPool.shutdown(); // causes index service creation to fail
|
||||
expectThrows(EsRejectedExecutionException.class, () -> newIndexService(module));
|
||||
assertThat(openAnalyzers, empty());
|
||||
}
|
||||
|
||||
public void testMmapNotAllowed() {
|
||||
String storeType = randomFrom(IndexModule.Type.HYBRIDFS.getSettingsKey(), IndexModule.Type.MMAPFS.getSettingsKey());
|
||||
final Settings settings = Settings.builder()
|
||||
|
@ -403,12 +481,19 @@ public class IndexModuleTests extends ESTestCase {
|
|||
|
||||
class CustomQueryCache implements QueryCache {
|
||||
|
||||
private final Set<CustomQueryCache> liveQueryCaches;
|
||||
|
||||
CustomQueryCache(Set<CustomQueryCache> liveQueryCaches) {
|
||||
this.liveQueryCaches = liveQueryCaches;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear(String reason) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
public void close() {
|
||||
assertTrue(liveQueryCaches == null || liveQueryCaches.remove(this));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue