IndicesClusterStateService should replace an init. replica with an init. primary with the same aId (#32374)

In rare cases it is possible that a nodes gets an instruction to replace a replica
shard that's in `POST_RECOVERY` with a new initializing primary with the same allocation id.
This can happen by batching cluster states that include the starting of the replica, with
closing of the indices, opening it up again and allocating the primary shard to the node in
question. The node should then clean it's initializing replica and replace it with a new
initializing primary.

I'm not sure whether the test I added really adds enough value as existing tests found this. The main reason I added is to allow for simpler reproduction and to double check I fixed it. I'm open to discuss if we should keep.

Closes #32308
This commit is contained in:
Boaz Leskes 2018-07-30 16:24:41 +03:00 committed by GitHub
parent 9a4d0069f6
commit 0cae19c8d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 9 deletions

View File

@ -420,6 +420,12 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
// state may result in a new shard being initialized while having the same allocation id as the currently started shard.
logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting);
indexService.removeShard(shardId.id(), "removing shard (stale copy)");
} else if (newShardRouting.primary() && currentRoutingEntry.primary() == false && newShardRouting.initializing()) {
assert currentRoutingEntry.initializing() : currentRoutingEntry; // see above if clause
// this can happen when cluster state batching batches activation of the shard, closing an index, reopening it
// and assigning an initializing primary to this node
logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting);
indexService.removeShard(shardId.id(), "removing shard (stale copy)");
}
}
}

View File

@ -27,10 +27,12 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable.Builder;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
@ -44,6 +46,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
@ -93,7 +96,8 @@ public class ClusterStateCreationUtils {
IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm).build();
.put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm)
.build();
RoutingTable.Builder routing = new RoutingTable.Builder();
routing.addAsNew(indexMetaData);
@ -138,12 +142,19 @@ public class ClusterStateCreationUtils {
TestShardRouting.newShardRouting(index, shardId.id(), replicaNode, relocatingNode, false, replicaState,
unassignedInfo));
}
final IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build();
IndexMetaData.Builder indexMetaDataBuilder = new IndexMetaData.Builder(indexMetaData);
indexMetaDataBuilder.putInSyncAllocationIds(0,
indexShardRoutingTable.activeShards().stream().map(ShardRouting::allocationId).map(AllocationId::getId)
.collect(Collectors.toSet())
);
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded());
state.metaData(MetaData.builder().put(indexMetaDataBuilder.build(), false).generateClusterUuidIfNeeded());
state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(indexMetaData.getIndex())
.addIndexShard(indexShardRoutingBuilder.build())).build());
.addIndexShard(indexShardRoutingTable)).build());
return state.build();
}

View File

@ -74,6 +74,10 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
enableRandomFailures = randomBoolean();
}
protected void disableRandomFailures() {
enableRandomFailures = false;
}
protected void failRandomly() {
if (enableRandomFailures && rarely()) {
throw new RuntimeException("dummy test failure");

View File

@ -49,6 +49,7 @@ import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.threadpool.TestThreadPool;
@ -75,6 +76,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPA
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -97,7 +99,6 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
terminate(threadPool);
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32308")
public void testRandomClusterStateUpdates() {
// we have an IndicesClusterStateService per node in the cluster
final Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap = new HashMap<>();
@ -199,6 +200,59 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
}
}
/**
* In rare cases it is possible that a nodes gets an instruction to replace a replica
* shard that's in POST_RECOVERY with a new initializing primary with the same allocation id.
* This can happen by batching cluster states that include the starting of the replica, with
* closing of the indices, opening it up again and allocating the primary shard to the node in
* question. The node should then clean it's initializing replica and replace it with a new
* initializing primary.
*/
public void testInitializingPrimaryRemovesInitializingReplicaWithSameAID() {
disableRandomFailures();
String index = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT);
ClusterState state = ClusterStateCreationUtils.state(index, randomBoolean(),
ShardRoutingState.STARTED, ShardRoutingState.INITIALIZING);
// the initial state which is derived from the newly created cluster state but doesn't contain the index
ClusterState previousState = ClusterState.builder(state)
.metaData(MetaData.builder(state.metaData()).remove(index))
.routingTable(RoutingTable.builder().build())
.build();
// pick a data node to simulate the adding an index cluster state change event on, that has shards assigned to it
final ShardRouting shardRouting = state.routingTable().index(index).shard(0).replicaShards().get(0);
final ShardId shardId = shardRouting.shardId();
DiscoveryNode node = state.nodes().get(shardRouting.currentNodeId());
// simulate the cluster state change on the node
ClusterState localState = adaptClusterStateToLocalNode(state, node);
ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node);
IndicesClusterStateService indicesCSSvc = createIndicesClusterStateService(node, RecordingIndicesService::new);
indicesCSSvc.start();
indicesCSSvc.applyClusterState(new ClusterChangedEvent("cluster state change that adds the index", localState, previousLocalState));
previousState = state;
// start the replica
state = cluster.applyStartedShards(state, state.routingTable().index(index).shard(0).replicaShards());
// close the index and open it up again (this will sometimes swap roles between primary and replica)
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(state.metaData().index(index).getIndex().getName());
state = cluster.closeIndices(state, closeIndexRequest);
OpenIndexRequest openIndexRequest = new OpenIndexRequest(state.metaData().index(index).getIndex().getName());
state = cluster.openIndices(state, openIndexRequest);
localState = adaptClusterStateToLocalNode(state, node);
previousLocalState = adaptClusterStateToLocalNode(previousState, node);
indicesCSSvc.applyClusterState(new ClusterChangedEvent("new cluster state", localState, previousLocalState));
final MockIndexShard shardOrNull = ((RecordingIndicesService) indicesCSSvc.indicesService).getShardOrNull(shardId);
assertThat(shardOrNull == null ? null : shardOrNull.routingEntry(),
equalTo(state.getRoutingNodes().node(node.getId()).getByShardId(shardId)));
}
public ClusterState randomInitialClusterState(Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap,
Supplier<MockIndicesService> indicesServiceSupplier) {
List<DiscoveryNode> allNodes = new ArrayList<>();