Internal: IndicesStore shouldn't try to delete index after deleting a shard
When a node discovers shard content on disk which isn't used, we reach out to all other nodes that supposed to have the shard active. Only once all of those have confirmed the shard active, the shard has no unassigned copies *and* no cluster state change have happened in the mean while, do we go and delete the shard folder. Currently, after removing a shard, the IndicesStores checks the indices services if that has no more shard active for this index and if so, it tries to delete the entire index folder (unless on master node, where we keep the index metadata around). This is wrong as both the check and the protections in IndicesServices.deleteIndexStore make sure that there isn't any shard *in use* from that index. However, it may be the we erroneously delete other unused shard copies on disk, without the proper safety guards described above. Normally, this is not a problem as the missing copy will be recovered from another shard copy on another node (although a shame). However, in extremely rare cases involving multiple node failures/restarts where all shard copies are not available (i.e., shard is red) there are race conditions which can cause all shard copies to be deleted. Instead, we should change the decision to clean up an index folder to based on checking the index directory for being empty and containing no shards.
This commit is contained in:
parent
15ea07f8c0
commit
616941e902
|
@ -22,6 +22,7 @@ package org.elasticsearch.env;
|
|||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
import org.apache.lucene.store.*;
|
||||
|
@ -29,6 +30,7 @@ import org.apache.lucene.util.IOUtils;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -661,6 +663,56 @@ public class NodeEnvironment extends AbstractComponent implements Closeable {
|
|||
return indices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to find all allocated shards for the given index
|
||||
* on the current node. NOTE: This methods is prone to race-conditions on the filesystem layer since it might not
|
||||
* see directories created concurrently or while it's traversing.
|
||||
* @param index the index to filter shards
|
||||
* @return a set of shard IDs
|
||||
* @throws IOException if an IOException occurs
|
||||
*/
|
||||
public Set<ShardId> findAllShardIds(final Index index) throws IOException {
|
||||
assert index != null;
|
||||
if (nodePaths == null || locks == null) {
|
||||
throw new IllegalStateException("node is not configured to store local location");
|
||||
}
|
||||
assert assertEnvIsLocked();
|
||||
final Set<ShardId> shardIds = Sets.newHashSet();
|
||||
String indexName = index.name();
|
||||
for (final NodePath nodePath : nodePaths) {
|
||||
Path location = nodePath.indicesPath;
|
||||
if (Files.isDirectory(location)) {
|
||||
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(location)) {
|
||||
for (Path indexPath : indexStream) {
|
||||
if (indexName.equals(indexPath.getFileName().toString())) {
|
||||
shardIds.addAll(findAllShardsForIndex(indexPath));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return shardIds;
|
||||
}
|
||||
|
||||
private static Set<ShardId> findAllShardsForIndex(Path indexPath) throws IOException {
|
||||
Set<ShardId> shardIds = new HashSet<>();
|
||||
if (Files.isDirectory(indexPath)) {
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
|
||||
String currentIndex = indexPath.getFileName().toString();
|
||||
for (Path shardPath : stream) {
|
||||
if (Files.isDirectory(shardPath)) {
|
||||
Integer shardId = Ints.tryParse(shardPath.getFileName().toString());
|
||||
if (shardId != null) {
|
||||
ShardId id = new ShardId(currentIndex, shardId);
|
||||
shardIds.add(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return shardIds;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (closed.compareAndSet(false, true) && locks != null) {
|
||||
|
|
|
@ -524,18 +524,38 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
|
|||
* This method deletes the shard contents on disk for the given shard ID. This method will fail if the shard deleting
|
||||
* is prevented by {@link #canDeleteShardContent(org.elasticsearch.index.shard.ShardId, org.elasticsearch.cluster.metadata.IndexMetaData)}
|
||||
* of if the shards lock can not be acquired.
|
||||
*
|
||||
* On data nodes, if the deleted shard is the last shard folder in its index, the method will attempt to remove the index folder as well.
|
||||
*
|
||||
* @param reason the reason for the shard deletion
|
||||
* @param shardId the shards ID to delete
|
||||
* @param metaData the shards index metadata. This is required to access the indexes settings etc.
|
||||
* @param clusterState . This is required to access the indexes settings etc.
|
||||
* @throws IOException if an IOException occurs
|
||||
*/
|
||||
public void deleteShardStore(String reason, ShardId shardId, IndexMetaData metaData) throws IOException {
|
||||
public void deleteShardStore(String reason, ShardId shardId, ClusterState clusterState) throws IOException {
|
||||
final IndexMetaData metaData = clusterState.getMetaData().indices().get(shardId.getIndex());
|
||||
|
||||
final Settings indexSettings = buildIndexSettings(metaData);
|
||||
if (canDeleteShardContent(shardId, indexSettings) == false) {
|
||||
throw new IllegalStateException("Can't delete shard " + shardId);
|
||||
}
|
||||
nodeEnv.deleteShardDirectorySafe(shardId, indexSettings);
|
||||
logger.trace("{} deleting shard reason [{}]", shardId, reason);
|
||||
logger.debug("{} deleted shard reason [{}]", shardId, reason);
|
||||
|
||||
if (clusterState.nodes().localNode().isMasterNode() == false && // master nodes keep the index meta data, even if having no shards..
|
||||
canDeleteIndexContents(shardId.index(), indexSettings)) {
|
||||
if (nodeEnv.findAllShardIds(shardId.index()).isEmpty()) {
|
||||
try {
|
||||
// note that deleteIndexStore have more safety checks and may throw an exception if index was concurrently created.
|
||||
deleteIndexStore("no longer used", metaData, clusterState);
|
||||
} catch (Exception e) {
|
||||
// wrap the exception to indicate we already deleted the shard
|
||||
throw new ElasticsearchException("failed to delete unused index after deleting its last shard (" + shardId + ")", e);
|
||||
}
|
||||
} else {
|
||||
logger.trace("[{}] still has shard stores, leaving as is", shardId.index());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.indices.store;
|
|||
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
|
@ -288,28 +287,18 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
|||
return;
|
||||
}
|
||||
|
||||
clusterService.submitStateUpdateTask("indices_store", new ClusterStateNonMasterUpdateTask() {
|
||||
clusterService.submitStateUpdateTask("indices_store ([" + shardId + "] active fully on other nodes)", new ClusterStateNonMasterUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
if (clusterState.getVersion() != currentState.getVersion()) {
|
||||
logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterState.getVersion());
|
||||
return currentState;
|
||||
}
|
||||
IndexMetaData indexMeta = clusterState.getMetaData().indices().get(shardId.getIndex());
|
||||
try {
|
||||
indicesService.deleteShardStore("no longer used", shardId, indexMeta);
|
||||
indicesService.deleteShardStore("no longer used", shardId, currentState);
|
||||
} catch (Throwable ex) {
|
||||
logger.debug("{} failed to delete unallocated shard, ignoring", ex, shardId);
|
||||
}
|
||||
// if the index doesn't exists anymore, delete its store as well, but only if its a non master node, since master
|
||||
// nodes keep the index metadata around
|
||||
if (indicesService.hasIndex(shardId.getIndex()) == false && currentState.nodes().localNode().masterNode() == false) {
|
||||
try {
|
||||
indicesService.deleteIndexStore("no longer used", indexMeta, currentState);
|
||||
} catch (Throwable ex) {
|
||||
logger.debug("{} failed to delete unallocated index, ignoring", ex, shardId.getIndex());
|
||||
}
|
||||
}
|
||||
return currentState;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.indices.store;
|
|||
|
||||
import com.google.common.base.Predicate;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -30,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -37,12 +39,14 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.recovery.RecoverySource;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
|
||||
import org.elasticsearch.test.disruption.SingleNodeDisruption;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TransportModule;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
|
@ -55,6 +59,7 @@ import java.nio.file.Path;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
|
@ -217,6 +222,87 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
|
|||
assertThat(waitForShardDeletion(node_4, "test", 0), equalTo(false));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@TestLogging("cluster.service:TRACE")
|
||||
public void testShardActiveElsewhereDoesNotDeleteAnother() throws Exception {
|
||||
Future<String> masterFuture = internalCluster().startNodeAsync(
|
||||
Settings.builder().put("node.master", true, "node.data", false).build());
|
||||
Future<List<String>> nodesFutures = internalCluster().startNodesAsync(4,
|
||||
Settings.builder().put("node.master", false, "node.data", true).build());
|
||||
|
||||
final String masterNode = masterFuture.get();
|
||||
final String node1 = nodesFutures.get().get(0);
|
||||
final String node2 = nodesFutures.get().get(1);
|
||||
final String node3 = nodesFutures.get().get(2);
|
||||
// we will use this later on, handy to start now to make sure it has a different data folder that node 1,2 &3
|
||||
final String node4 = nodesFutures.get().get(3);
|
||||
|
||||
assertAcked(prepareCreate("test").setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.put(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "_name", node4)
|
||||
));
|
||||
assertFalse(client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).setWaitForGreenStatus().setWaitForNodes("5").get().isTimedOut());
|
||||
|
||||
// disable allocation to control the situation more easily
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "none")));
|
||||
|
||||
logger.debug("--> shutting down two random nodes");
|
||||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1, node2, node3));
|
||||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1, node2, node3));
|
||||
|
||||
logger.debug("--> verifying index is red");
|
||||
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
|
||||
if (health.getStatus() != ClusterHealthStatus.RED) {
|
||||
logClusterState();
|
||||
fail("cluster didn't become red, despite of shutting 2 of 3 nodes");
|
||||
}
|
||||
|
||||
logger.debug("--> allowing index to be assigned to node [{}]", node4);
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(
|
||||
Settings.builder()
|
||||
.put(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "_name", "NONE")));
|
||||
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "all")));
|
||||
|
||||
logger.debug("--> waiting for shards to recover on [{}]", node4);
|
||||
// we have to do this in two steps as we now do async shard fetching before assigning, so the change to the
|
||||
// allocation filtering may not have immediate effect
|
||||
// TODO: we should add an easier to do this. It's too much of a song and dance..
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
assertTrue(internalCluster().getInstance(IndicesService.class, node4).hasIndex("test"));
|
||||
}
|
||||
});
|
||||
|
||||
// wait for 4 active shards - we should have lost one shard
|
||||
assertFalse(client().admin().cluster().prepareHealth().setWaitForActiveShards(4).get().isTimedOut());
|
||||
|
||||
// disable allocation again to control concurrency a bit and allow shard active to kick in before allocation
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "none")));
|
||||
|
||||
logger.debug("--> starting the two old nodes back");
|
||||
|
||||
internalCluster().startNodesAsync(2,
|
||||
Settings.builder().put("node.master", false, "node.data", true).build());
|
||||
|
||||
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("5").get().isTimedOut());
|
||||
|
||||
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
|
||||
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "all")));
|
||||
|
||||
logger.debug("--> waiting for the lost shard to be recovered");
|
||||
|
||||
ensureGreen("test");
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@Slow
|
||||
public void testShardActiveElseWhere() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue