Fixes issue with dangling index being deleted instead of re-imported (#19666)

Fixes an issue where a node that receives a cluster state
update with a brand new cluster UUID but without an
initial persistence block could cause indices to be wiped out,
preventing them from being reimported as dangling indices.
This commit only removes the in-memory data structures and
thus, are subsequently reimported as dangling indices.
This commit is contained in:
Ali Beyad 2016-08-04 08:47:46 -04:00 committed by GitHub
parent ede78ad231
commit 8bbc312fdd
4 changed files with 115 additions and 31 deletions

View File

@ -199,10 +199,14 @@ public class ClusterChangedEvent {
return nodesRemoved() || nodesAdded();
}
// Determines whether or not the current cluster state represents an entirely
// different cluster from the previous cluster state, which will happen when a
// master node is elected that has never been part of the cluster before.
private boolean isNewCluster() {
/**
* Determines whether or not the current cluster state represents an entirely
* new cluster, either when a node joins a cluster for the first time or when
* the node receives a cluster state update from a brand new cluster (different
* UUID from the previous cluster), which will happen when a master node is
* elected that has never been part of the cluster before.
*/
public boolean isNewCluster() {
final String prevClusterUUID = previousState.metaData().clusterUUID();
final String currClusterUUID = state.metaData().clusterUUID();
return prevClusterUUID.equals(currClusterUUID) == false;

View File

@ -280,6 +280,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("cluster uuid: ").append(metaData.clusterUUID()).append("\n");
sb.append("version: ").append(version).append("\n");
sb.append("state uuid: ").append(stateUUID).append("\n");
sb.append("from_diff: ").append(wasReadFromDiff).append("\n");

View File

@ -177,7 +177,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
deleteIndices(event); // also deletes shards of deleted indices
removeUnallocatedIndices(state); // also removes shards of removed indices
removeUnallocatedIndices(event); // also removes shards of removed indices
failMissingShards(state);
@ -286,28 +286,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
});
}
}
// delete local indices that do neither exist in previous cluster state nor part of tombstones
for (AllocatedIndex<? extends Shard> indexService : indicesService) {
Index index = indexService.index();
IndexMetaData indexMetaData = event.state().metaData().index(index);
if (indexMetaData == null) {
assert false : "index" + index + " exists locally, doesn't have a metadata but is not part"
+ " of the delete index list. \nprevious state: " + event.previousState().prettyPrint()
+ "\n current state:\n" + event.state().prettyPrint();
logger.warn("[{}] isn't part of metadata but is part of in memory structures. removing", index);
indicesService.deleteIndex(index, "isn't part of metadata (explicit check)");
}
}
}
/**
* Removes indices that have no shards allocated to this node. This does not delete the shard data as we wait for enough
* shard copies to exist in the cluster before deleting shard data (triggered by {@link org.elasticsearch.indices.store.IndicesStore}).
*
* @param state new cluster state
* @param event the cluster changed event
*/
private void removeUnallocatedIndices(final ClusterState state) {
private void removeUnallocatedIndices(final ClusterChangedEvent event) {
final ClusterState state = event.state();
final String localNodeId = state.nodes().getLocalNodeId();
assert localNodeId != null;
@ -322,6 +310,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
for (AllocatedIndex<? extends Shard> indexService : indicesService) {
Index index = indexService.index();
if (indicesWithShards.contains(index) == false) {
// if the cluster change indicates a brand new cluster, we only want
// to remove the in-memory structures for the index and not delete the
// contents on disk because the index will later be re-imported as a
// dangling index
assert state.metaData().index(index) != null || event.isNewCluster() :
"index " + index + " does not exist in the cluster state, it should either " +
"have been deleted or the cluster must be new";
logger.debug("{} removing index, no shards allocated", index);
indicesService.removeIndex(index, "removing index (no shards allocated)");
}

View File

@ -32,20 +32,26 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation.FailedShard;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.recovery.RecoveryTargetService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -56,9 +62,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -69,7 +77,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
public void testRandomClusterStateUpdates() {
// we have an IndicesClusterStateService per node in the cluster
final Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap = new HashMap<>();
ClusterState state = randomInitialClusterState(clusterStateServiceMap);
ClusterState state = randomInitialClusterState(clusterStateServiceMap, MockIndicesService::new);
// each of the following iterations represents a new cluster state update processed on all nodes
for (int i = 0; i < 30; i++) {
@ -78,7 +86,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
// calculate new cluster state
for (int j = 0; j < randomInt(3); j++) { // multiple iterations to simulate batching of cluster states
state = randomlyUpdateClusterState(state, clusterStateServiceMap);
state = randomlyUpdateClusterState(state, clusterStateServiceMap, MockIndicesService::new);
}
// apply cluster state to nodes (incl. master)
@ -97,7 +105,65 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
logger.info("Final cluster state: {}", state.prettyPrint());
}
public ClusterState randomInitialClusterState(Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap) {
/**
* This test ensures that when a node joins a brand new cluster (different cluster UUID),
* different from the cluster it was previously a part of, the in-memory index data structures
* are all removed but the on disk contents of those indices remain so that they can later be
* imported as dangling indices. Normally, the first cluster state update that the node
* receives from the new cluster would contain a cluster block that would cause all in-memory
* structures to be removed (see {@link IndicesClusterStateService#clusterChanged(ClusterChangedEvent)}),
* but in the case where the node joined and was a few cluster state updates behind, it would
* not have received the cluster block, in which case we still need to remove the in-memory
* structures while ensuring the data remains on disk. This test executes this particular
* scenario.
*/
public void testJoiningNewClusterOnlyRemovesInMemoryIndexStructures() {
// a cluster state derived from the initial state that includes a created index
String name = "index_" + randomAsciiOfLength(8).toLowerCase(Locale.ROOT);
ShardRoutingState[] replicaStates = new ShardRoutingState[randomIntBetween(0, 3)];
Arrays.fill(replicaStates, ShardRoutingState.INITIALIZING);
ClusterState stateWithIndex = ClusterStateCreationUtils.state(name, randomBoolean(), ShardRoutingState.INITIALIZING, replicaStates);
// the initial state which is derived from the newly created cluster state but doesn't contain the index
ClusterState initialState = ClusterState.builder(stateWithIndex)
.metaData(MetaData.builder(stateWithIndex.metaData()).remove(name))
.routingTable(RoutingTable.builder().build())
.build();
// pick a data node to simulate the adding an index cluster state change event on, that has shards assigned to it
DiscoveryNode node = stateWithIndex.nodes().get(
randomFrom(stateWithIndex.routingTable().index(name).shardsWithState(INITIALIZING)).currentNodeId());
// simulate the cluster state change on the node
ClusterState localState = adaptClusterStateToLocalNode(stateWithIndex, node);
ClusterState previousLocalState = adaptClusterStateToLocalNode(initialState, node);
IndicesClusterStateService indicesCSSvc = createIndicesClusterStateService(RecordingIndicesService::new);
indicesCSSvc.start();
indicesCSSvc.clusterChanged(new ClusterChangedEvent("cluster state change that adds the index", localState, previousLocalState));
// create a new empty cluster state with a brand new cluster UUID
ClusterState newClusterState = ClusterState.builder(initialState)
.metaData(MetaData.builder(initialState.metaData()).clusterUUID(UUIDs.randomBase64UUID()))
.build();
// simulate the cluster state change on the node
localState = adaptClusterStateToLocalNode(newClusterState, node);
previousLocalState = adaptClusterStateToLocalNode(stateWithIndex, node);
indicesCSSvc.clusterChanged(new ClusterChangedEvent("cluster state change with a new cluster UUID (and doesn't contain the index)",
localState, previousLocalState));
// check that in memory data structures have been removed once the new cluster state is applied,
// but the persistent data is still there
RecordingIndicesService indicesService = (RecordingIndicesService) indicesCSSvc.indicesService;
for (IndexMetaData indexMetaData : stateWithIndex.metaData()) {
Index index = indexMetaData.getIndex();
assertNull(indicesService.indexService(index));
assertFalse(indicesService.isDeleted(index));
}
}
public ClusterState randomInitialClusterState(Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap,
Supplier<MockIndicesService> indicesServiceSupplier) {
List<DiscoveryNode> allNodes = new ArrayList<>();
DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master
allNodes.add(localNode);
@ -109,14 +175,15 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
}
ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()]));
// add nodes to clusterStateServiceMap
updateNodes(state, clusterStateServiceMap);
updateNodes(state, clusterStateServiceMap, indicesServiceSupplier);
return state;
}
private void updateNodes(ClusterState state, Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap) {
private void updateNodes(ClusterState state, Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap,
Supplier<MockIndicesService> indicesServiceSupplier) {
for (DiscoveryNode node : state.nodes()) {
clusterStateServiceMap.computeIfAbsent(node, discoveryNode -> {
IndicesClusterStateService ics = createIndicesClusterStateService();
IndicesClusterStateService ics = createIndicesClusterStateService(indicesServiceSupplier);
ics.start();
return ics;
});
@ -131,7 +198,8 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
}
public ClusterState randomlyUpdateClusterState(ClusterState state,
Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap) {
Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap,
Supplier<MockIndicesService> indicesServiceSupplier) {
// randomly create new indices (until we have 200 max)
for (int i = 0; i < randomInt(5); i++) {
if (state.metaData().indices().size() > 200) {
@ -229,7 +297,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).put(createNode()).build();
state = ClusterState.builder(state).nodes(newNodes).build();
state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave
updateNodes(state, clusterStateServiceMap);
updateNodes(state, clusterStateServiceMap, indicesServiceSupplier);
}
} else {
// remove node
@ -239,7 +307,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).remove(discoveryNode.getId()).build();
state = ClusterState.builder(state).nodes(newNodes).build();
state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node join
updateNodes(state, clusterStateServiceMap);
updateNodes(state, clusterStateServiceMap, indicesServiceSupplier);
}
}
}
@ -263,11 +331,11 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(node.getId())).build();
}
private IndicesClusterStateService createIndicesClusterStateService() {
private IndicesClusterStateService createIndicesClusterStateService(final Supplier<MockIndicesService> indicesServiceSupplier) {
final ThreadPool threadPool = mock(ThreadPool.class);
final Executor executor = mock(Executor.class);
when(threadPool.generic()).thenReturn(executor);
final MockIndicesService indicesService = new MockIndicesService();
final MockIndicesService indicesService = indicesServiceSupplier.get();
final TransportService transportService = new TransportService(Settings.EMPTY, null, threadPool);
final ClusterService clusterService = mock(ClusterService.class);
final RepositoriesService repositoriesService = new RepositoriesService(Settings.EMPTY, clusterService,
@ -279,4 +347,20 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
threadPool, recoveryTargetService, shardStateAction, null, repositoriesService, null, null, null, null, null);
}
private class RecordingIndicesService extends MockIndicesService {
private Set<Index> deletedIndices = Collections.emptySet();
@Override
public synchronized void deleteIndex(Index index, String reason) {
super.deleteIndex(index, reason);
Set<Index> newSet = Sets.newHashSet(deletedIndices);
newSet.add(index);
deletedIndices = Collections.unmodifiableSet(newSet);
}
public synchronized boolean isDeleted(Index index) {
return deletedIndices.contains(index);
}
}
}