Cache successful shard deletion checks (#21438)

Each node checks on every cluster state update if there are shards that it can possibly delete from its disk. It decides this by doing a file-system lookup for each shard id that is fully allocated in the cluster. With lots of shards, this amounts to lots of Files.exists() checks, considerably slowing down cluster state updates. This commit adds a caching layer so that the Files.exists() checks can be skipped if not needed.
This commit is contained in:
Yannick Welsch 2016-11-11 10:06:15 +01:00 committed by GitHub
parent df965fc9b3
commit 2d3a52c0f2
4 changed files with 101 additions and 106 deletions

View File

@ -703,8 +703,9 @@ public class IndicesService extends AbstractLifecycleComponent
final IndexMetaData metaData = clusterState.getMetaData().indices().get(shardId.getIndexName());
final IndexSettings indexSettings = buildIndexSettings(metaData);
if (canDeleteShardContent(shardId, indexSettings) == false) {
throw new IllegalStateException("Can't delete shard " + shardId);
ShardDeletionCheckResult shardDeletionCheckResult = canDeleteShardContent(shardId, indexSettings);
if (shardDeletionCheckResult != ShardDeletionCheckResult.FOLDER_FOUND_CAN_DELETE) {
throw new IllegalStateException("Can't delete shard " + shardId + " (cause: " + shardDeletionCheckResult + ")");
}
nodeEnv.deleteShardDirectorySafe(shardId, indexSettings);
logger.debug("{} deleted shard reason [{}]", shardId, reason);
@ -785,39 +786,50 @@ public class IndicesService extends AbstractLifecycleComponent
}
/**
* Returns <code>true</code> iff the shards content for the given shard can be deleted.
* This method will return <code>false</code> if:
* <ul>
* <li>if the shard is still allocated / active on this node</li>
* <li>if for instance if the shard is located on shared and should not be deleted</li>
* <li>if the shards data locations do not exists</li>
* </ul>
* result type returned by {@link #canDeleteShardContent signaling different reasons why a shard can / cannot be deleted}
*/
public enum ShardDeletionCheckResult {
FOLDER_FOUND_CAN_DELETE, // shard data exists and can be deleted
STILL_ALLOCATED, // the shard is still allocated / active on this node
NO_FOLDER_FOUND, // the shards data locations do not exist
SHARED_FILE_SYSTEM, // the shard is located on shared and should not be deleted
NO_LOCAL_STORAGE // node does not have local storage (see DiscoveryNode.nodeRequiresLocalStorage)
}
/**
* Returns <code>ShardDeletionCheckResult</code> signaling whether the shards content for the given shard can be deleted.
*
* @param shardId the shard to delete.
* @param indexSettings the shards's relevant {@link IndexSettings}. This is required to access the indexes settings etc.
*/
public boolean canDeleteShardContent(ShardId shardId, IndexSettings indexSettings) {
public ShardDeletionCheckResult canDeleteShardContent(ShardId shardId, IndexSettings indexSettings) {
assert shardId.getIndex().equals(indexSettings.getIndex());
final IndexService indexService = indexService(shardId.getIndex());
if (indexSettings.isOnSharedFilesystem() == false) {
if (nodeEnv.hasNodeFile()) {
final boolean isAllocated = indexService != null && indexService.hasShard(shardId.id());
if (isAllocated) {
return false; // we are allocated - can't delete the shard
return ShardDeletionCheckResult.STILL_ALLOCATED; // we are allocated - can't delete the shard
} else if (indexSettings.hasCustomDataPath()) {
// lets see if it's on a custom path (return false if the shared doesn't exist)
// we don't need to delete anything that is not there
return Files.exists(nodeEnv.resolveCustomLocation(indexSettings, shardId));
return Files.exists(nodeEnv.resolveCustomLocation(indexSettings, shardId)) ?
ShardDeletionCheckResult.FOLDER_FOUND_CAN_DELETE :
ShardDeletionCheckResult.NO_FOLDER_FOUND;
} else {
// lets see if it's path is available (return false if the shared doesn't exist)
// we don't need to delete anything that is not there
return FileSystemUtils.exists(nodeEnv.availableShardPaths(shardId));
return FileSystemUtils.exists(nodeEnv.availableShardPaths(shardId)) ?
ShardDeletionCheckResult.FOLDER_FOUND_CAN_DELETE :
ShardDeletionCheckResult.NO_FOLDER_FOUND;
}
}
} else {
return ShardDeletionCheckResult.NO_LOCAL_STORAGE;
}
} else {
logger.trace("{} skipping shard directory deletion due to shadow replicas", shardId);
return ShardDeletionCheckResult.SHARED_FILE_SYSTEM;
}
return false;
}
private IndexSettings buildIndexSettings(IndexMetaData metaData) {
@ -1126,7 +1138,7 @@ public class IndicesService extends AbstractLifecycleComponent
public void loadIntoContext(ShardSearchRequest request, SearchContext context, QueryPhase queryPhase) throws Exception {
assert canCache(request, context);
final DirectoryReader directoryReader = context.searcher().getDirectoryReader();
boolean[] loadedFromCache = new boolean[] { true };
BytesReference bytesReference = cacheShardLevelResult(context.indexShard(), directoryReader, request.cacheKey(), out -> {
queryPhase.execute(context);

View File

@ -31,6 +31,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterServiceState;
@ -63,7 +65,10 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -80,6 +85,9 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private final TransportService transportService;
private final ThreadPool threadPool;
// Cache successful shard deletion checks to prevent unnecessary file system lookups
private final Set<ShardId> folderNotFoundCache = new HashSet<>();
private TimeValue deleteShardTimeout;
@Inject
@ -115,11 +123,31 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
return;
}
for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
RoutingTable routingTable = event.state().routingTable();
// remove entries from cache that don't exist in the routing table anymore (either closed or deleted indices)
// - removing shard data of deleted indices is handled by IndicesClusterStateService
// - closed indices don't need to be removed from the cache but we do it anyway for code simplicity
for (Iterator<ShardId> it = folderNotFoundCache.iterator(); it.hasNext(); ) {
ShardId shardId = it.next();
if (routingTable.hasIndex(shardId.getIndex()) == false) {
it.remove();
}
}
// remove entries from cache which are allocated to this node
final String localNodeId = event.state().nodes().getLocalNodeId();
RoutingNode localRoutingNode = event.state().getRoutingNodes().node(localNodeId);
if (localRoutingNode != null) {
for (ShardRouting routing : localRoutingNode) {
folderNotFoundCache.remove(routing.shardId());
}
}
for (IndexRoutingTable indexRoutingTable : routingTable) {
// Note, closed indices will not have any routing information, so won't be deleted
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (shardCanBeDeleted(event.state(), indexShardRoutingTable)) {
ShardId shardId = indexShardRoutingTable.shardId();
ShardId shardId = indexShardRoutingTable.shardId();
if (folderNotFoundCache.contains(shardId) == false && shardCanBeDeleted(localNodeId, indexShardRoutingTable)) {
IndexService indexService = indicesService.indexService(indexRoutingTable.getIndex());
final IndexSettings indexSettings;
if (indexService == null) {
@ -128,15 +156,33 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
} else {
indexSettings = indexService.getIndexSettings();
}
if (indicesService.canDeleteShardContent(shardId, indexSettings)) {
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
IndicesService.ShardDeletionCheckResult shardDeletionCheckResult = indicesService.canDeleteShardContent(shardId, indexSettings);
switch (shardDeletionCheckResult) {
case FOLDER_FOUND_CAN_DELETE:
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
break;
case NO_FOLDER_FOUND:
folderNotFoundCache.add(shardId);
break;
case NO_LOCAL_STORAGE:
assert false : "shard deletion only runs on data nodes which always have local storage";
// nothing to do
break;
case STILL_ALLOCATED:
// nothing to do
break;
case SHARED_FILE_SYSTEM:
// nothing to do
break;
default:
assert false : "unknown shard deletion check result: " + shardDeletionCheckResult;
}
}
}
}
}
boolean shardCanBeDeleted(ClusterState state, IndexShardRoutingTable indexShardRoutingTable) {
static boolean shardCanBeDeleted(String localNodeId, IndexShardRoutingTable indexShardRoutingTable) {
// a shard can be deleted if all its copies are active, and its not allocated on this node
if (indexShardRoutingTable.size() == 0) {
// should not really happen, there should always be at least 1 (primary) shard in a
@ -146,27 +192,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
for (ShardRouting shardRouting : indexShardRoutingTable) {
// be conservative here, check on started, not even active
if (!shardRouting.started()) {
if (shardRouting.started() == false) {
return false;
}
// if the allocated or relocation node id doesn't exists in the cluster state it may be a stale node,
// make sure we don't do anything with this until the routing table has properly been rerouted to reflect
// the fact that the node does not exists
DiscoveryNode node = state.nodes().get(shardRouting.currentNodeId());
if (node == null) {
return false;
}
if (shardRouting.relocatingNodeId() != null) {
node = state.nodes().get(shardRouting.relocatingNodeId());
if (node == null) {
return false;
}
}
// check if shard is active on the current node or is getting relocated to the our node
String localNodeId = state.getNodes().getLocalNode().getId();
if (localNodeId.equals(shardRouting.currentNodeId()) || localNodeId.equals(shardRouting.relocatingNodeId())) {
// check if shard is active on the current node
if (localNodeId.equals(shardRouting.currentNodeId())) {
return false;
}
}
@ -179,16 +210,9 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
String indexUUID = indexShardRoutingTable.shardId().getIndex().getUUID();
ClusterName clusterName = state.getClusterName();
for (ShardRouting shardRouting : indexShardRoutingTable) {
// Node can't be null, because otherwise shardCanBeDeleted() would have returned false
assert shardRouting.started() : "expected started shard but was " + shardRouting;
DiscoveryNode currentNode = state.nodes().get(shardRouting.currentNodeId());
assert currentNode != null;
requests.add(new Tuple<>(currentNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId(), deleteShardTimeout)));
if (shardRouting.relocatingNodeId() != null) {
DiscoveryNode relocatingNode = state.nodes().get(shardRouting.relocatingNodeId());
assert relocatingNode != null;
requests.add(new Tuple<>(relocatingNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId(), deleteShardTimeout)));
}
}
ShardActiveResponseHandler responseHandler = new ShardActiveResponseHandler(indexShardRoutingTable.shardId(), state.getVersion(),

View File

@ -39,6 +39,7 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.IndicesService.ShardDeletionCheckResult;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule;
@ -92,16 +93,19 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
1).build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());
ShardId shardId = new ShardId(meta.getIndex(), 0);
assertFalse("no shard location", indicesService.canDeleteShardContent(shardId, indexSettings));
assertEquals("no shard location", indicesService.canDeleteShardContent(shardId, indexSettings),
ShardDeletionCheckResult.NO_FOLDER_FOUND);
IndexService test = createIndex("test");
shardId = new ShardId(test.index(), 0);
assertTrue(test.hasShard(0));
assertFalse("shard is allocated", indicesService.canDeleteShardContent(shardId, test.getIndexSettings()));
assertEquals("shard is allocated", indicesService.canDeleteShardContent(shardId, test.getIndexSettings()),
ShardDeletionCheckResult.STILL_ALLOCATED);
test.removeShard(0, "boom");
assertTrue("shard is removed", indicesService.canDeleteShardContent(shardId, test.getIndexSettings()));
assertEquals("shard is removed", indicesService.canDeleteShardContent(shardId, test.getIndexSettings()),
ShardDeletionCheckResult.FOLDER_FOUND_CAN_DELETE);
ShardId notAllocated = new ShardId(test.index(), 100);
assertFalse("shard that was never on this node should NOT be deletable",
indicesService.canDeleteShardContent(notAllocated, test.getIndexSettings()));
assertEquals("shard that was never on this node should NOT be deletable",
indicesService.canDeleteShardContent(notAllocated, test.getIndexSettings()), ShardDeletionCheckResult.NO_FOLDER_FOUND);
}
public void testDeleteIndexStore() throws Exception {

View File

@ -63,37 +63,11 @@ public class IndicesStoreTests extends ESTestCase {
NOT_STARTED_STATES = set.toArray(new ShardRoutingState[set.size()]);
}
private static ThreadPool threadPool;
private IndicesStore indicesStore;
private DiscoveryNode localNode;
private ClusterService clusterService;
@BeforeClass
public static void beforeClass() {
threadPool = new TestThreadPool("ShardReplicationTests");
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}
@Before
public void before() {
public void createLocalNode() {
localNode = new DiscoveryNode("abc", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
clusterService = createClusterService(threadPool);
TransportService transportService = new TransportService(clusterService.getSettings(), null, null,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
indicesStore = new IndicesStore(Settings.EMPTY, null, clusterService, transportService, null);
}
@After
public void tearDown() throws Exception {
super.tearDown();
clusterService.close();
}
public void testShardCanBeDeletedNoShardRouting() throws Exception {
@ -104,7 +78,7 @@ public class IndicesStoreTests extends ESTestCase {
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(numShards).numberOfReplicas(numReplicas)));
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", "_na_", 1));
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
assertFalse(IndicesStore.shardCanBeDeleted(localNode.getId(), routingTable.build()));
}
public void testShardCanBeDeletedNoShardStarted() throws Exception {
@ -128,10 +102,11 @@ public class IndicesStoreTests extends ESTestCase {
if (state == ShardRoutingState.UNASSIGNED) {
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
}
routingTable.addShard(TestShardRouting.newShardRouting("test", i, "xyz", null, j == 0, state, unassignedInfo));
routingTable.addShard(TestShardRouting.newShardRouting("test", i, randomBoolean() ? localNode.getId() : randomAsciiOfLength(10), null, j == 0, state, unassignedInfo));
}
}
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
assertFalse(IndicesStore.shardCanBeDeleted(localNode.getId(), routingTable.build()));
}
public void testShardCanBeDeletedShardExistsLocally() throws Exception {
@ -154,27 +129,7 @@ public class IndicesStoreTests extends ESTestCase {
}
// Shard exists locally, can't delete shard
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
}
public void testShardCanBeDeletedNodeNotInList() throws Exception {
int numShards = randomIntBetween(1, 7);
int numReplicas = randomInt(2);
ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test"));
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(numShards).numberOfReplicas(numReplicas)));
clusterState.nodes(DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode));
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", "_na_", 1));
for (int i = 0; i < numShards; i++) {
String relocatingNodeId = randomBoolean() ? null : "def";
routingTable.addShard(TestShardRouting.newShardRouting("test", i, "xyz", relocatingNodeId, true, ShardRoutingState.STARTED));
for (int j = 0; j < numReplicas; j++) {
routingTable.addShard(TestShardRouting.newShardRouting("test", i, "xyz", relocatingNodeId, false, ShardRoutingState.STARTED));
}
}
// null node -> false
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
assertFalse(IndicesStore.shardCanBeDeleted(localNode.getId(), routingTable.build()));
}
public void testShardCanBeDeletedNodeVersion() throws Exception {
@ -196,7 +151,7 @@ public class IndicesStoreTests extends ESTestCase {
}
// shard exist on other node (abc)
assertTrue(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
assertTrue(IndicesStore.shardCanBeDeleted(localNode.getId(), routingTable.build()));
}
public void testShardCanBeDeletedRelocatingNode() throws Exception {
@ -221,6 +176,6 @@ public class IndicesStoreTests extends ESTestCase {
}
// shard exist on other node (abc and def)
assertTrue(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
assertTrue(IndicesStore.shardCanBeDeleted(localNode.getId(), routingTable.build()));
}
}