From 8bbc312fdd2442ce5c9ad431ab0c61930069df2f Mon Sep 17 00:00:00 2001 From: Ali Beyad Date: Thu, 4 Aug 2016 08:47:46 -0400 Subject: [PATCH] Fixes issue with dangling index being deleted instead of re-imported (#19666) Fixes an issue where a node that receives a cluster state update with a brand new cluster UUID but without an initial persistence block could cause indices to be wiped out, preventing them from being reimported as dangling indices. This commit only removes the in-memory data structures and thus, are subsequently reimported as dangling indices. --- .../cluster/ClusterChangedEvent.java | 12 +- .../elasticsearch/cluster/ClusterState.java | 1 + .../cluster/IndicesClusterStateService.java | 27 ++--- ...ClusterStateServiceRandomUpdatesTests.java | 106 ++++++++++++++++-- 4 files changed, 115 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java b/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java index efd525d313b..e3164eacdbb 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterChangedEvent.java @@ -199,10 +199,14 @@ public class ClusterChangedEvent { return nodesRemoved() || nodesAdded(); } - // Determines whether or not the current cluster state represents an entirely - // different cluster from the previous cluster state, which will happen when a - // master node is elected that has never been part of the cluster before. - private boolean isNewCluster() { + /** + * Determines whether or not the current cluster state represents an entirely + * new cluster, either when a node joins a cluster for the first time or when + * the node receives a cluster state update from a brand new cluster (different + * UUID from the previous cluster), which will happen when a master node is + * elected that has never been part of the cluster before. + */ + public boolean isNewCluster() { final String prevClusterUUID = previousState.metaData().clusterUUID(); final String currClusterUUID = state.metaData().clusterUUID(); return prevClusterUUID.equals(currClusterUUID) == false; diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java index abad2e9a8e4..6745900057d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -280,6 +280,7 @@ public class ClusterState implements ToXContent, Diffable { public String prettyPrint() { StringBuilder sb = new StringBuilder(); + sb.append("cluster uuid: ").append(metaData.clusterUUID()).append("\n"); sb.append("version: ").append(version).append("\n"); sb.append("state uuid: ").append(stateUUID).append("\n"); sb.append("from_diff: ").append(wasReadFromDiff).append("\n"); diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index fd06175e28c..f01a09d4608 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -177,7 +177,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple deleteIndices(event); // also deletes shards of deleted indices - removeUnallocatedIndices(state); // also removes shards of removed indices + removeUnallocatedIndices(event); // also removes shards of removed indices failMissingShards(state); @@ -286,28 +286,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple }); } } - - // delete local indices that do neither exist in previous cluster state nor part of tombstones - for (AllocatedIndex indexService : indicesService) { - Index index = indexService.index(); - IndexMetaData indexMetaData = event.state().metaData().index(index); - if (indexMetaData == null) { - assert false : "index" + index + " exists locally, doesn't have a metadata but is not part" - + " of the delete index list. \nprevious state: " + event.previousState().prettyPrint() - + "\n current state:\n" + event.state().prettyPrint(); - logger.warn("[{}] isn't part of metadata but is part of in memory structures. removing", index); - indicesService.deleteIndex(index, "isn't part of metadata (explicit check)"); - } - } } /** * Removes indices that have no shards allocated to this node. This does not delete the shard data as we wait for enough * shard copies to exist in the cluster before deleting shard data (triggered by {@link org.elasticsearch.indices.store.IndicesStore}). * - * @param state new cluster state + * @param event the cluster changed event */ - private void removeUnallocatedIndices(final ClusterState state) { + private void removeUnallocatedIndices(final ClusterChangedEvent event) { + final ClusterState state = event.state(); final String localNodeId = state.nodes().getLocalNodeId(); assert localNodeId != null; @@ -322,6 +310,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple for (AllocatedIndex indexService : indicesService) { Index index = indexService.index(); if (indicesWithShards.contains(index) == false) { + // if the cluster change indicates a brand new cluster, we only want + // to remove the in-memory structures for the index and not delete the + // contents on disk because the index will later be re-imported as a + // dangling index + assert state.metaData().index(index) != null || event.isNewCluster() : + "index " + index + " does not exist in the cluster state, it should either " + + "have been deleted or the cluster must be new"; logger.debug("{} removing index, no shards allocated", index); indicesService.removeIndex(index, "removing index (no shards allocated)"); } diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index c2ccb9cd4ab..4477974d118 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -32,20 +32,26 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation.FailedShard; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.Index; import org.elasticsearch.indices.recovery.RecoveryTargetService; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -56,9 +62,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Executor; +import java.util.function.Supplier; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -69,7 +77,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice public void testRandomClusterStateUpdates() { // we have an IndicesClusterStateService per node in the cluster final Map clusterStateServiceMap = new HashMap<>(); - ClusterState state = randomInitialClusterState(clusterStateServiceMap); + ClusterState state = randomInitialClusterState(clusterStateServiceMap, MockIndicesService::new); // each of the following iterations represents a new cluster state update processed on all nodes for (int i = 0; i < 30; i++) { @@ -78,7 +86,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice // calculate new cluster state for (int j = 0; j < randomInt(3); j++) { // multiple iterations to simulate batching of cluster states - state = randomlyUpdateClusterState(state, clusterStateServiceMap); + state = randomlyUpdateClusterState(state, clusterStateServiceMap, MockIndicesService::new); } // apply cluster state to nodes (incl. master) @@ -97,7 +105,65 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice logger.info("Final cluster state: {}", state.prettyPrint()); } - public ClusterState randomInitialClusterState(Map clusterStateServiceMap) { + /** + * This test ensures that when a node joins a brand new cluster (different cluster UUID), + * different from the cluster it was previously a part of, the in-memory index data structures + * are all removed but the on disk contents of those indices remain so that they can later be + * imported as dangling indices. Normally, the first cluster state update that the node + * receives from the new cluster would contain a cluster block that would cause all in-memory + * structures to be removed (see {@link IndicesClusterStateService#clusterChanged(ClusterChangedEvent)}), + * but in the case where the node joined and was a few cluster state updates behind, it would + * not have received the cluster block, in which case we still need to remove the in-memory + * structures while ensuring the data remains on disk. This test executes this particular + * scenario. + */ + public void testJoiningNewClusterOnlyRemovesInMemoryIndexStructures() { + // a cluster state derived from the initial state that includes a created index + String name = "index_" + randomAsciiOfLength(8).toLowerCase(Locale.ROOT); + ShardRoutingState[] replicaStates = new ShardRoutingState[randomIntBetween(0, 3)]; + Arrays.fill(replicaStates, ShardRoutingState.INITIALIZING); + ClusterState stateWithIndex = ClusterStateCreationUtils.state(name, randomBoolean(), ShardRoutingState.INITIALIZING, replicaStates); + + // the initial state which is derived from the newly created cluster state but doesn't contain the index + ClusterState initialState = ClusterState.builder(stateWithIndex) + .metaData(MetaData.builder(stateWithIndex.metaData()).remove(name)) + .routingTable(RoutingTable.builder().build()) + .build(); + + // pick a data node to simulate the adding an index cluster state change event on, that has shards assigned to it + DiscoveryNode node = stateWithIndex.nodes().get( + randomFrom(stateWithIndex.routingTable().index(name).shardsWithState(INITIALIZING)).currentNodeId()); + + // simulate the cluster state change on the node + ClusterState localState = adaptClusterStateToLocalNode(stateWithIndex, node); + ClusterState previousLocalState = adaptClusterStateToLocalNode(initialState, node); + IndicesClusterStateService indicesCSSvc = createIndicesClusterStateService(RecordingIndicesService::new); + indicesCSSvc.start(); + indicesCSSvc.clusterChanged(new ClusterChangedEvent("cluster state change that adds the index", localState, previousLocalState)); + + // create a new empty cluster state with a brand new cluster UUID + ClusterState newClusterState = ClusterState.builder(initialState) + .metaData(MetaData.builder(initialState.metaData()).clusterUUID(UUIDs.randomBase64UUID())) + .build(); + + // simulate the cluster state change on the node + localState = adaptClusterStateToLocalNode(newClusterState, node); + previousLocalState = adaptClusterStateToLocalNode(stateWithIndex, node); + indicesCSSvc.clusterChanged(new ClusterChangedEvent("cluster state change with a new cluster UUID (and doesn't contain the index)", + localState, previousLocalState)); + + // check that in memory data structures have been removed once the new cluster state is applied, + // but the persistent data is still there + RecordingIndicesService indicesService = (RecordingIndicesService) indicesCSSvc.indicesService; + for (IndexMetaData indexMetaData : stateWithIndex.metaData()) { + Index index = indexMetaData.getIndex(); + assertNull(indicesService.indexService(index)); + assertFalse(indicesService.isDeleted(index)); + } + } + + public ClusterState randomInitialClusterState(Map clusterStateServiceMap, + Supplier indicesServiceSupplier) { List allNodes = new ArrayList<>(); DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master allNodes.add(localNode); @@ -109,14 +175,15 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice } ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()])); // add nodes to clusterStateServiceMap - updateNodes(state, clusterStateServiceMap); + updateNodes(state, clusterStateServiceMap, indicesServiceSupplier); return state; } - private void updateNodes(ClusterState state, Map clusterStateServiceMap) { + private void updateNodes(ClusterState state, Map clusterStateServiceMap, + Supplier indicesServiceSupplier) { for (DiscoveryNode node : state.nodes()) { clusterStateServiceMap.computeIfAbsent(node, discoveryNode -> { - IndicesClusterStateService ics = createIndicesClusterStateService(); + IndicesClusterStateService ics = createIndicesClusterStateService(indicesServiceSupplier); ics.start(); return ics; }); @@ -131,7 +198,8 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice } public ClusterState randomlyUpdateClusterState(ClusterState state, - Map clusterStateServiceMap) { + Map clusterStateServiceMap, + Supplier indicesServiceSupplier) { // randomly create new indices (until we have 200 max) for (int i = 0; i < randomInt(5); i++) { if (state.metaData().indices().size() > 200) { @@ -229,7 +297,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).put(createNode()).build(); state = ClusterState.builder(state).nodes(newNodes).build(); state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave - updateNodes(state, clusterStateServiceMap); + updateNodes(state, clusterStateServiceMap, indicesServiceSupplier); } } else { // remove node @@ -239,7 +307,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()).remove(discoveryNode.getId()).build(); state = ClusterState.builder(state).nodes(newNodes).build(); state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node join - updateNodes(state, clusterStateServiceMap); + updateNodes(state, clusterStateServiceMap, indicesServiceSupplier); } } } @@ -263,11 +331,11 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(node.getId())).build(); } - private IndicesClusterStateService createIndicesClusterStateService() { + private IndicesClusterStateService createIndicesClusterStateService(final Supplier indicesServiceSupplier) { final ThreadPool threadPool = mock(ThreadPool.class); final Executor executor = mock(Executor.class); when(threadPool.generic()).thenReturn(executor); - final MockIndicesService indicesService = new MockIndicesService(); + final MockIndicesService indicesService = indicesServiceSupplier.get(); final TransportService transportService = new TransportService(Settings.EMPTY, null, threadPool); final ClusterService clusterService = mock(ClusterService.class); final RepositoriesService repositoriesService = new RepositoriesService(Settings.EMPTY, clusterService, @@ -279,4 +347,20 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice threadPool, recoveryTargetService, shardStateAction, null, repositoriesService, null, null, null, null, null); } + private class RecordingIndicesService extends MockIndicesService { + private Set deletedIndices = Collections.emptySet(); + + @Override + public synchronized void deleteIndex(Index index, String reason) { + super.deleteIndex(index, reason); + Set newSet = Sets.newHashSet(deletedIndices); + newSet.add(index); + deletedIndices = Collections.unmodifiableSet(newSet); + } + + public synchronized boolean isDeleted(Index index) { + return deletedIndices.contains(index); + } + } + }