Trigger replica recovery restarts by master when primary relocation completes (#23926)

When a primary relocation completes while there are ongoing replica recoveries, the recoveries for these replicas need to be restarted (as a new primary is in charge of replicating changes). Before this commit, the need for a recovery restart was detected by the data nodes that had the replicas, by checking on each cluster state update if the recovery process had completed before the recovery source changed. That code had a race, however, which could lead to a not-fully recovered shard exposing itself as started (see #23904).

This commit takes a different approach: When the primary relocation completes and the master updates the cluster state to move the primary shard from relocating to started, it will reinitialize all initializing replica shards, by giving them a fresh allocation id. Data nodes that have the replica shard will simply detect that the allocation id changed and restart the recovery process (instead of trying to determine the need to restart based on ongoing recoveries).

Note: Removal of the code in IndicesClusterStateService that checks whether the recovery source has changed will not be backported to the 5.x branch. This ensures backward compatibility for the situation where the master node is older and does not have the code changes that have been introduced in this PR.

Closes #23904
This commit is contained in:
Yannick Welsch 2017-04-11 11:21:57 +02:00 committed by GitHub
parent 0114f0061c
commit 88a54f14c7
11 changed files with 173 additions and 36 deletions

View File

@ -69,6 +69,12 @@ public interface RoutingChangesObserver {
*/ */
void replicaPromoted(ShardRouting replicaShard); void replicaPromoted(ShardRouting replicaShard);
/**
* Called when an initializing replica is reinitialized. This happens when a primary relocation completes, which
* reinitializes all currently initializing replicas as their recovery source node changes
*/
void initializedReplicaReinitialized(ShardRouting oldReplica, ShardRouting reinitializedReplica);
/** /**
* Abstract implementation of {@link RoutingChangesObserver} that does not take any action. Useful for subclasses that only override * Abstract implementation of {@link RoutingChangesObserver} that does not take any action. Useful for subclasses that only override
@ -120,6 +126,11 @@ public interface RoutingChangesObserver {
public void replicaPromoted(ShardRouting replicaShard) { public void replicaPromoted(ShardRouting replicaShard) {
} }
@Override
public void initializedReplicaReinitialized(ShardRouting oldReplica, ShardRouting reinitializedReplica) {
}
} }
class DelegatingRoutingChangesObserver implements RoutingChangesObserver { class DelegatingRoutingChangesObserver implements RoutingChangesObserver {
@ -192,5 +203,12 @@ public interface RoutingChangesObserver {
routingChangesObserver.replicaPromoted(replicaShard); routingChangesObserver.replicaPromoted(replicaShard);
} }
} }
@Override
public void initializedReplicaReinitialized(ShardRouting oldReplica, ShardRouting reinitializedReplica) {
for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) {
routingChangesObserver.initializedReplicaReinitialized(oldReplica, reinitializedReplica);
}
}
} }
} }

View File

@ -451,6 +451,9 @@ public class RoutingNodes implements Iterable<RoutingNode> {
* *
* Moves the initializing shard to started. If the shard is a relocation target, also removes the relocation source. * Moves the initializing shard to started. If the shard is a relocation target, also removes the relocation source.
* *
* If the started shard is a primary relocation target, this also reinitializes currently initializing replicas as their
* recovery source changes
*
* @return the started shard * @return the started shard
*/ */
public ShardRouting startShard(Logger logger, ShardRouting initializingShard, RoutingChangesObserver routingChangesObserver) { public ShardRouting startShard(Logger logger, ShardRouting initializingShard, RoutingChangesObserver routingChangesObserver) {
@ -468,6 +471,30 @@ public class RoutingNodes implements Iterable<RoutingNode> {
+ initializingShard + " but was: " + relocationSourceShard.getTargetRelocatingShard(); + initializingShard + " but was: " + relocationSourceShard.getTargetRelocatingShard();
remove(relocationSourceShard); remove(relocationSourceShard);
routingChangesObserver.relocationCompleted(relocationSourceShard); routingChangesObserver.relocationCompleted(relocationSourceShard);
// if this is a primary shard with ongoing replica recoveries, reinitialize them as their recovery source changed
if (startedShard.primary()) {
List<ShardRouting> assignedShards = assignedShards(startedShard.shardId());
// copy list to prevent ConcurrentModificationException
for (ShardRouting routing : new ArrayList<>(assignedShards)) {
if (routing.initializing() && routing.primary() == false) {
if (routing.isRelocationTarget()) {
// find the relocation source
ShardRouting sourceShard = getByAllocationId(routing.shardId(), routing.allocationId().getRelocationId());
// cancel relocation and start relocation to same node again
ShardRouting startedReplica = cancelRelocation(sourceShard);
remove(routing);
routingChangesObserver.shardFailed(routing,
new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, "primary changed"));
relocateShard(startedReplica, sourceShard.relocatingNodeId(),
sourceShard.getExpectedShardSize(), routingChangesObserver);
} else {
ShardRouting reinitializedReplica = reinitReplica(routing);
routingChangesObserver.initializedReplicaReinitialized(routing, reinitializedReplica);
}
}
}
}
} }
return startedShard; return startedShard;
} }
@ -730,6 +757,15 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return reinitializedShard; return reinitializedShard;
} }
private ShardRouting reinitReplica(ShardRouting shard) {
assert shard.primary() == false : "shard must be a replica: " + shard;
assert shard.initializing() : "can only reinitialize an initializing replica: " + shard;
assert shard.isRelocationTarget() == false : "replication target cannot be reinitialized: " + shard;
ShardRouting reinitializedShard = shard.reinitializeReplicaShard();
updateAssigned(shard, reinitializedShard);
return reinitializedShard;
}
private void updateAssigned(ShardRouting oldShard, ShardRouting newShard) { private void updateAssigned(ShardRouting oldShard, ShardRouting newShard) {
assert oldShard.shardId().equals(newShard.shardId()) : assert oldShard.shardId().equals(newShard.shardId()) :
"can only update " + oldShard + " by shard with same shard id but was " + newShard; "can only update " + oldShard + " by shard with same shard id but was " + newShard;

View File

@ -393,6 +393,17 @@ public final class ShardRouting implements Writeable, ToXContent {
allocationId, UNAVAILABLE_EXPECTED_SHARD_SIZE); allocationId, UNAVAILABLE_EXPECTED_SHARD_SIZE);
} }
/**
* Reinitializes a replica shard, giving it a fresh allocation id
*/
public ShardRouting reinitializeReplicaShard() {
assert state == ShardRoutingState.INITIALIZING : this;
assert primary == false : this;
assert isRelocationTarget() == false : this;
return new ShardRouting(shardId, currentNodeId, null, primary, ShardRoutingState.INITIALIZING,
recoverySource, unassignedInfo, AllocationId.newInitializing(), expectedShardSize);
}
/** /**
* Set the shards state to <code>STARTED</code>. The shards state must be * Set the shards state to <code>STARTED</code>. The shards state must be
* <code>INITIALIZING</code> or <code>RELOCATING</code>. Any relocation will be * <code>INITIALIZING</code> or <code>RELOCATING</code>. Any relocation will be

View File

@ -41,7 +41,9 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.gateway.GatewayAllocator;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.function.Function; import java.util.function.Function;
@ -88,6 +90,9 @@ public class AllocationService extends AbstractComponent {
routingNodes.unassigned().shuffle(); routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState, RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
clusterInfoService.getClusterInfo(), currentNanoTime(), false); clusterInfoService.getClusterInfo(), currentNanoTime(), false);
// as starting a primary relocation target can reinitialize replica shards, start replicas first
startedShards = new ArrayList<>(startedShards);
Collections.sort(startedShards, Comparator.comparing(ShardRouting::primary));
applyStartedShards(allocation, startedShards); applyStartedShards(allocation, startedShards);
gatewayAllocator.applyStartedShards(allocation, startedShards); gatewayAllocator.applyStartedShards(allocation, startedShards);
reroute(allocation); reroute(allocation);

View File

@ -96,6 +96,17 @@ public class RoutingNodesChangedObserver implements RoutingChangesObserver {
setChanged(); setChanged();
} }
@Override
public void initializedReplicaReinitialized(ShardRouting oldReplica, ShardRouting reinitializedReplica) {
assert oldReplica.initializing() && oldReplica.primary() == false :
"expected initializing replica shard " + oldReplica;
assert reinitializedReplica.initializing() && reinitializedReplica.primary() == false :
"expected reinitialized replica shard " + reinitializedReplica;
assert oldReplica.allocationId().getId().equals(reinitializedReplica.allocationId().getId()) == false :
"expected allocation id to change for reinitialized replica shard (old: " + oldReplica + " new: " + reinitializedReplica + ")";
setChanged();
}
/** /**
* Marks the allocation as changed. * Marks the allocation as changed.
*/ */

View File

@ -416,6 +416,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// active primaries. // active primaries.
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state()); throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
} }
assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.RELOCATED ||
state == IndexShardState.CLOSED :
"routing is active, but local shard state isn't. routing: " + newRouting + ", local state: " + state;
this.shardRouting = newRouting; this.shardRouting = newRouting;
persistMetadata(newRouting, currentRouting); persistMetadata(newRouting, currentRouting);
} }
@ -498,6 +501,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @return the previous shard state * @return the previous shard state
*/ */
private IndexShardState changeState(IndexShardState newState, String reason) { private IndexShardState changeState(IndexShardState newState, String reason) {
assert Thread.holdsLock(mutex);
logger.debug("state: [{}]->[{}], reason [{}]", state, newState, reason); logger.debug("state: [{}]->[{}], reason [{}]", state, newState, reason);
IndexShardState previousState = state; IndexShardState previousState = state;
state = newState; state = newState;

View File

@ -403,20 +403,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
// state may result in a new shard being initialized while having the same allocation id as the currently started shard. // state may result in a new shard being initialized while having the same allocation id as the currently started shard.
logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting); logger.debug("{} removing shard (not active, current {}, new {})", shardId, currentRoutingEntry, newShardRouting);
indexService.removeShard(shardId.id(), "removing shard (stale copy)"); indexService.removeShard(shardId.id(), "removing shard (stale copy)");
} else {
// remove shards where recovery source has changed. This re-initializes shards later in createOrUpdateShards
if (newShardRouting.recoverySource() != null && newShardRouting.recoverySource().getType() == Type.PEER) {
RecoveryState recoveryState = shard.recoveryState();
final DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, newShardRouting);
if (recoveryState.getSourceNode().equals(sourceNode) == false) {
if (recoveryTargetService.cancelRecoveriesForShard(shardId, "recovery source node changed")) {
// getting here means that the shard was still recovering
logger.debug("{} removing shard (recovery source changed), current [{}], global [{}], shard [{}])",
shardId, recoveryState.getSourceNode(), sourceNode, newShardRouting);
indexService.removeShard(shardId.id(), "removing shard (recovery source node changed)");
}
}
}
} }
} }
} }

View File

@ -126,17 +126,6 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
} }
} }
/**
* Cancel all ongoing recoveries for the given shard.
*
* @param reason reason for cancellation
* @param shardId shard ID for which to cancel recoveries
* @return {@code true} if a recovery was cancelled
*/
public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
return onGoingRecoveries.cancelRecoveriesForShard(shardId, reason);
}
public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) { public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
// create a new recovery status, and process... // create a new recovery status, and process...
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout()); final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());

View File

@ -33,11 +33,14 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
@ -84,4 +87,69 @@ public class StartedShardsRoutingTests extends ESAllocationTestCase {
assertThat(shardRouting.currentNodeId(), equalTo("node2")); assertThat(shardRouting.currentNodeId(), equalTo("node2"));
assertThat(shardRouting.relocatingNodeId(), nullValue()); assertThat(shardRouting.relocatingNodeId(), nullValue());
} }
public void testRelocatingPrimariesWithInitializingReplicas() {
AllocationService allocation = createAllocationService();
logger.info("--> building initial cluster state");
AllocationId primaryId = AllocationId.newRelocation(AllocationId.newInitializing());
AllocationId replicaId = AllocationId.newInitializing();
boolean relocatingReplica = randomBoolean();
if (relocatingReplica) {
replicaId = AllocationId.newRelocation(replicaId);
}
final IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1).numberOfReplicas(1)
.putInSyncAllocationIds(0,
relocatingReplica ? Sets.newHashSet(primaryId.getId(), replicaId.getId()) : Sets.newHashSet(primaryId.getId()))
.build();
final Index index = indexMetaData.getIndex();
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")).add(newNode("node3")).add(newNode("node4")))
.metaData(MetaData.builder().put(indexMetaData, false));
final ShardRouting relocatingPrimary = TestShardRouting.newShardRouting(
new ShardId(index, 0), "node1", "node2", true, ShardRoutingState.RELOCATING, primaryId);
final ShardRouting replica = TestShardRouting.newShardRouting(
new ShardId(index, 0), "node3", relocatingReplica ? "node4" : null, false,
relocatingReplica ? ShardRoutingState.RELOCATING : ShardRoutingState.INITIALIZING, replicaId);
stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index)
.addIndexShard(new IndexShardRoutingTable.Builder(relocatingPrimary.shardId())
.addShard(relocatingPrimary)
.addShard(replica)
.build()))
.build());
ClusterState state = stateBuilder.build();
logger.info("--> test starting of relocating primary shard with initializing / relocating replica");
ClusterState newState = allocation.applyStartedShards(state, Arrays.asList(relocatingPrimary.getTargetRelocatingShard()));
assertNotEquals(newState, state);
assertTrue(newState.routingTable().index("test").allPrimaryShardsActive());
ShardRouting startedReplica = newState.routingTable().index("test").shard(0).replicaShards().get(0);
if (relocatingReplica) {
assertTrue(startedReplica.relocating());
assertEquals(replica.currentNodeId(), startedReplica.currentNodeId());
assertEquals(replica.relocatingNodeId(), startedReplica.relocatingNodeId());
assertEquals(replica.allocationId().getId(), startedReplica.allocationId().getId());
assertNotEquals(replica.allocationId().getRelocationId(), startedReplica.allocationId().getRelocationId());
} else {
assertTrue(startedReplica.initializing());
assertEquals(replica.currentNodeId(), startedReplica.currentNodeId());
assertNotEquals(replica.allocationId().getId(), startedReplica.allocationId().getId());
}
logger.info("--> test starting of relocating primary shard together with initializing / relocating replica");
List<ShardRouting> startedShards = new ArrayList<>();
startedShards.add(relocatingPrimary.getTargetRelocatingShard());
startedShards.add(relocatingReplica ? replica.getTargetRelocatingShard() : replica);
Collections.shuffle(startedShards, random());
newState = allocation.applyStartedShards(state, startedShards);
assertNotEquals(newState, state);
assertTrue(newState.routingTable().index("test").shard(0).allShardsStarted());
}
} }

View File

@ -284,7 +284,7 @@ public class IndexShardTests extends IndexShardTestCase {
true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing()))); true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing())));
} else if (randomBoolean()) { } else if (randomBoolean()) {
// simulate promotion // simulate promotion
indexShard = newShard(shardId, false); indexShard = newStartedShard(false);
ShardRouting replicaRouting = indexShard.routingEntry(); ShardRouting replicaRouting = indexShard.routingEntry();
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,

View File

@ -41,9 +41,12 @@ import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.master.TransportMasterNodeActionUtils; import org.elasticsearch.action.support.master.TransportMasterNodeActionUtils;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction.ShardEntry;
import org.elasticsearch.cluster.metadata.AliasValidator; import org.elasticsearch.cluster.metadata.AliasValidator;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -217,23 +220,29 @@ public class ClusterStateChanges extends AbstractComponent {
} }
public ClusterState applyFailedShards(ClusterState clusterState, List<FailedShard> failedShards) { public ClusterState applyFailedShards(ClusterState clusterState, List<FailedShard> failedShards) {
List<ShardStateAction.ShardEntry> entries = failedShards.stream().map(failedShard -> List<ShardEntry> entries = failedShards.stream().map(failedShard ->
new ShardStateAction.ShardEntry(failedShard.getRoutingEntry().shardId(), failedShard.getRoutingEntry().allocationId().getId(), new ShardEntry(failedShard.getRoutingEntry().shardId(), failedShard.getRoutingEntry().allocationId().getId(),
0L, failedShard.getMessage(), failedShard.getFailure())) 0L, failedShard.getMessage(), failedShard.getFailure()))
.collect(Collectors.toList()); .collect(Collectors.toList());
try { return runTasks(shardFailedClusterStateTaskExecutor, clusterState, entries);
return shardFailedClusterStateTaskExecutor.execute(clusterState, entries).resultingState;
} catch (Exception e) {
throw ExceptionsHelper.convertToRuntime(e);
}
} }
public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards) { public ClusterState applyStartedShards(ClusterState clusterState, List<ShardRouting> startedShards) {
List<ShardStateAction.ShardEntry> entries = startedShards.stream().map(startedShard -> List<ShardEntry> entries = startedShards.stream().map(startedShard ->
new ShardStateAction.ShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), 0L, "shard started", null)) new ShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), 0L, "shard started", null))
.collect(Collectors.toList()); .collect(Collectors.toList());
return runTasks(shardStartedClusterStateTaskExecutor, clusterState, entries);
}
private <T> ClusterState runTasks(ClusterStateTaskExecutor<T> executor, ClusterState clusterState, List<T> entries) {
try { try {
return shardStartedClusterStateTaskExecutor.execute(clusterState, entries).resultingState; ClusterTasksResult<T> result = executor.execute(clusterState, entries);
for (ClusterStateTaskExecutor.TaskResult taskResult : result.executionResults.values()) {
if (taskResult.isSuccess() == false) {
throw taskResult.getFailure();
}
}
return result.resultingState;
} catch (Exception e) { } catch (Exception e) {
throw ExceptionsHelper.convertToRuntime(e); throw ExceptionsHelper.convertToRuntime(e);
} }