Avoid double-recovery when state recovery delayed
Today if state recovery is delayed by the `gateway.recover_after_*` settings then we may end up performing state recovery twice: once when enough nodes have joined the cluster, and again when the timeout elapses. The second state recovery reinitializes the routing table, effectively discarding all recovered/recovering shards and starting again from scratch. This commit adds a check to prevent this second state recovery. Closes #55564
This commit is contained in:
parent
d9685a0f19
commit
06b3345787
|
@ -232,6 +232,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
|||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
logger.debug("performing state recovery...");
|
||||
recoveryRunnable.run();
|
||||
}
|
||||
});
|
||||
|
@ -248,6 +249,11 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
|
|||
|
||||
@Override
|
||||
public ClusterState execute(final ClusterState currentState) {
|
||||
if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
|
||||
logger.debug("cluster is already recovered");
|
||||
return currentState;
|
||||
}
|
||||
|
||||
final ClusterState newState = Function.<ClusterState>identity()
|
||||
.andThen(ClusterStateUpdaters::updateRoutingTable)
|
||||
.andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock)
|
||||
|
|
|
@ -19,21 +19,49 @@
|
|||
|
||||
package org.elasticsearch.gateway;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.EmptyClusterInfoService;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.gateway.TestGatewayAllocator;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
|
||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
|
||||
public class GatewayServiceTests extends ESTestCase {
|
||||
|
||||
private GatewayService createService(final Settings.Builder settings) {
|
||||
final ClusterService clusterService = new ClusterService(Settings.builder().put("cluster.name", "GatewayServiceTests").build(),
|
||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
|
||||
null);
|
||||
return new GatewayService(settings.build(), null, clusterService, null, null, null);
|
||||
final AllocationService allocationService = new AllocationService(new AllocationDeciders(new HashSet<>(
|
||||
Arrays.asList(new SameShardAllocationDecider(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
|
||||
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), new ReplicaAfterPrimaryActiveAllocationDecider()))),
|
||||
new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
|
||||
return new GatewayService(settings.build(), allocationService, clusterService, null, null, null);
|
||||
}
|
||||
|
||||
public void testDefaultRecoverAfterTime() {
|
||||
|
@ -69,4 +97,23 @@ public class GatewayServiceTests extends ESTestCase {
|
|||
assertSettingDeprecationsAndWarnings(new Setting<?>[] {GatewayService.RECOVER_AFTER_MASTER_NODES_SETTING });
|
||||
}
|
||||
|
||||
public void testRecoverStateUpdateTask() throws Exception {
|
||||
GatewayService service = createService(Settings.builder());
|
||||
ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask();
|
||||
String nodeId = randomAlphaOfLength(10);
|
||||
DiscoveryNode masterNode = DiscoveryNode.createLocal(settings(Version.CURRENT)
|
||||
.put(Node.NODE_MASTER_SETTING.getKey(), true).build(),
|
||||
new TransportAddress(TransportAddress.META_ADDRESS, 9300), nodeId);
|
||||
ClusterState stateWithBlock = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build()).
|
||||
blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK).build()).build();
|
||||
|
||||
ClusterState recoveredState = clusterStateUpdateTask.execute(stateWithBlock);
|
||||
assertNotEquals(recoveredState, stateWithBlock);
|
||||
assertThat(recoveredState.blocks().global(ClusterBlockLevel.METADATA_WRITE), not(hasItem(STATE_NOT_RECOVERED_BLOCK)));
|
||||
|
||||
ClusterState clusterState = clusterStateUpdateTask.execute(recoveredState);
|
||||
assertSame(recoveredState, clusterState);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue