Don't close caches while there might still be in-flight requests. (#38958)
Many of our index components use ref-counting so that in the event that a shard is closed while there are still ongoing requests, then the index reader and the store only effectively get closed when ongoing requests have finished. However we don't apply the same principle to the request and query caches, which might get closed while there are still in-flight requests. This commit adds ref-counting to `IndicesService` so that the caches and other components it maintains only get closed when all shards are effectively closed. Closes #37117
This commit is contained in:
parent
af7b89b80c
commit
45b17e8645
|
@ -248,6 +248,18 @@ final class CompositeIndexEventListener implements IndexEventListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onStoreCreated(ShardId shardId) {
|
||||||
|
for (IndexEventListener listener : listeners) {
|
||||||
|
try {
|
||||||
|
listener.onStoreCreated(shardId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("failed to invoke on store created", e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onStoreClosed(ShardId shardId) {
|
public void onStoreClosed(ShardId shardId) {
|
||||||
for (IndexEventListener listener : listeners) {
|
for (IndexEventListener listener : listeners) {
|
||||||
|
|
|
@ -388,6 +388,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
||||||
DirectoryService directoryService = indexStore.newDirectoryService(path);
|
DirectoryService directoryService = indexStore.newDirectoryService(path);
|
||||||
store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock,
|
store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock,
|
||||||
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
|
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
|
||||||
|
eventListener.onStoreCreated(shardId);
|
||||||
indexShard = new IndexShard(
|
indexShard = new IndexShard(
|
||||||
routing,
|
routing,
|
||||||
this.indexSettings,
|
this.indexSettings,
|
||||||
|
|
|
@ -160,6 +160,13 @@ public interface IndexEventListener {
|
||||||
default void beforeIndexAddedToCluster(Index index, Settings indexSettings) {
|
default void beforeIndexAddedToCluster(Index index, Settings indexSettings) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called when the given shards store is created. The shard store is created before the shard is created.
|
||||||
|
*
|
||||||
|
* @param shardId the shard ID the store belongs to
|
||||||
|
*/
|
||||||
|
default void onStoreCreated(ShardId shardId) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when the given shards store is closed. The store is closed once all resource have been released on the store.
|
* Called when the given shards store is closed. The store is closed once all resource have been released on the store.
|
||||||
* This implies that all index readers are closed and no recoveries are running.
|
* This implies that all index readers are closed and no recoveries are running.
|
||||||
|
|
|
@ -62,6 +62,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
|
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.common.util.iterable.Iterables;
|
import org.elasticsearch.common.util.iterable.Iterables;
|
||||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||||
|
@ -125,6 +126,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -195,6 +197,7 @@ public class IndicesService extends AbstractLifecycleComponent
|
||||||
private final MetaStateService metaStateService;
|
private final MetaStateService metaStateService;
|
||||||
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
|
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
|
||||||
private final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories;
|
private final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories;
|
||||||
|
final AbstractRefCounted indicesRefCount; // pkg-private for testing
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doStart() {
|
protected void doStart() {
|
||||||
|
@ -250,6 +253,27 @@ public class IndicesService extends AbstractLifecycleComponent
|
||||||
}
|
}
|
||||||
|
|
||||||
this.indexStoreFactories = indexStoreFactories;
|
this.indexStoreFactories = indexStoreFactories;
|
||||||
|
// doClose() is called when shutting down a node, yet there might still be ongoing requests
|
||||||
|
// that we need to wait for before closing some resources such as the caches. In order to
|
||||||
|
// avoid closing these resources while ongoing requests are still being processed, we use a
|
||||||
|
// ref count which will only close them when both this service and all index services are
|
||||||
|
// actually closed
|
||||||
|
indicesRefCount = new AbstractRefCounted("indices") {
|
||||||
|
@Override
|
||||||
|
protected void closeInternal() {
|
||||||
|
try {
|
||||||
|
IOUtils.close(
|
||||||
|
analysisRegistry,
|
||||||
|
indexingMemoryController,
|
||||||
|
indicesFieldDataCache,
|
||||||
|
cacheCleaner,
|
||||||
|
indicesRequestCache,
|
||||||
|
indicesQueryCache);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -281,14 +305,8 @@ public class IndicesService extends AbstractLifecycleComponent
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doClose() {
|
protected void doClose() throws IOException {
|
||||||
IOUtils.closeWhileHandlingException(
|
indicesRefCount.decRef();
|
||||||
analysisRegistry,
|
|
||||||
indexingMemoryController,
|
|
||||||
indicesFieldDataCache,
|
|
||||||
cacheCleaner,
|
|
||||||
indicesRequestCache,
|
|
||||||
indicesQueryCache);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -456,9 +474,17 @@ public class IndicesService extends AbstractLifecycleComponent
|
||||||
}
|
}
|
||||||
List<IndexEventListener> finalListeners = new ArrayList<>(builtInListeners);
|
List<IndexEventListener> finalListeners = new ArrayList<>(builtInListeners);
|
||||||
final IndexEventListener onStoreClose = new IndexEventListener() {
|
final IndexEventListener onStoreClose = new IndexEventListener() {
|
||||||
|
@Override
|
||||||
|
public void onStoreCreated(ShardId shardId) {
|
||||||
|
indicesRefCount.incRef();
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public void onStoreClosed(ShardId shardId) {
|
public void onStoreClosed(ShardId shardId) {
|
||||||
indicesQueryCache.onClose(shardId);
|
try {
|
||||||
|
indicesRefCount.decRef();
|
||||||
|
} finally {
|
||||||
|
indicesQueryCache.onClose(shardId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
finalListeners.add(onStoreClose);
|
finalListeners.add(onStoreClose);
|
||||||
|
|
|
@ -81,7 +81,7 @@ public class PreBuiltCacheFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<T> values() {
|
public Collection<T> values() {
|
||||||
return Collections.singleton(model);
|
return model == null ? Collections.emptySet() : Collections.singleton(model);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,124 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.indices;
|
||||||
|
|
||||||
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
|
import org.elasticsearch.env.Environment;
|
||||||
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
|
import org.elasticsearch.index.IndexService;
|
||||||
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
|
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||||
|
import org.elasticsearch.node.MockNode;
|
||||||
|
import org.elasticsearch.node.Node;
|
||||||
|
import org.elasticsearch.node.NodeValidationException;
|
||||||
|
import org.elasticsearch.script.ScriptService;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.test.InternalTestCluster;
|
||||||
|
import org.elasticsearch.test.MockHttpTransport;
|
||||||
|
import org.elasticsearch.transport.nio.MockNioTransportPlugin;
|
||||||
|
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
|
||||||
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||||
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||||
|
import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING;
|
||||||
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
|
|
||||||
|
public class IndicesServiceCloseTests extends ESTestCase {
|
||||||
|
|
||||||
|
private Node startNode() throws NodeValidationException {
|
||||||
|
final Path tempDir = createTempDir();
|
||||||
|
String nodeName = "node_s_0";
|
||||||
|
Settings settings = Settings.builder()
|
||||||
|
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), InternalTestCluster.clusterName("single-node-cluster", random().nextLong()))
|
||||||
|
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
|
||||||
|
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo"))
|
||||||
|
.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), createTempDir().getParent())
|
||||||
|
.put(Node.NODE_NAME_SETTING.getKey(), nodeName)
|
||||||
|
.put(ScriptService.SCRIPT_MAX_COMPILATIONS_RATE.getKey(), "1000/1m")
|
||||||
|
.put(EsExecutors.PROCESSORS_SETTING.getKey(), 1) // limit the number of threads created
|
||||||
|
.put("transport.type", getTestTransportType())
|
||||||
|
.put(Node.NODE_DATA_SETTING.getKey(), true)
|
||||||
|
.put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong())
|
||||||
|
// default the watermarks low values to prevent tests from failing on nodes without enough disk space
|
||||||
|
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1b")
|
||||||
|
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "1b")
|
||||||
|
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1b")
|
||||||
|
// turning on the real memory circuit breaker leads to spurious test failures. As have no full control over heap usage, we
|
||||||
|
// turn it off for these tests.
|
||||||
|
.put(HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING.getKey(), false)
|
||||||
|
.putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes
|
||||||
|
.putList(INITIAL_MASTER_NODES_SETTING.getKey(), nodeName)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Node node = new MockNode(settings, Arrays.asList(MockNioTransportPlugin.class, MockHttpTransport.TestPlugin.class), true);
|
||||||
|
node.start();
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCloseEmptyIndicesService() throws Exception {
|
||||||
|
Node node = startNode();
|
||||||
|
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
|
||||||
|
assertEquals(1, indicesService.indicesRefCount.refCount());
|
||||||
|
node.close();
|
||||||
|
assertEquals(0, indicesService.indicesRefCount.refCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCloseNonEmptyIndicesService() throws Exception {
|
||||||
|
Node node = startNode();
|
||||||
|
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
|
||||||
|
assertEquals(1, indicesService.indicesRefCount.refCount());
|
||||||
|
|
||||||
|
assertAcked(node.client().admin().indices().prepareCreate("test")
|
||||||
|
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)));
|
||||||
|
|
||||||
|
assertEquals(2, indicesService.indicesRefCount.refCount());
|
||||||
|
|
||||||
|
node.close();
|
||||||
|
assertEquals(0, indicesService.indicesRefCount.refCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCloseWhileOngoingRequest() throws Exception {
|
||||||
|
Node node = startNode();
|
||||||
|
IndicesService indicesService = node.injector().getInstance(IndicesService.class);
|
||||||
|
assertEquals(1, indicesService.indicesRefCount.refCount());
|
||||||
|
|
||||||
|
assertAcked(node.client().admin().indices().prepareCreate("test")
|
||||||
|
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)));
|
||||||
|
|
||||||
|
assertEquals(2, indicesService.indicesRefCount.refCount());
|
||||||
|
|
||||||
|
IndexService indexService = indicesService.iterator().next();
|
||||||
|
IndexShard shard = indexService.getShard(0);
|
||||||
|
shard.store().incRef();
|
||||||
|
|
||||||
|
node.close();
|
||||||
|
assertEquals(1, indicesService.indicesRefCount.refCount());
|
||||||
|
|
||||||
|
shard.store().decRef();
|
||||||
|
assertEquals(0, indicesService.indicesRefCount.refCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue