Make primary-replica resync failures less lenient (#28534)

Today, failures from the primary-replica resync are ignored as the best 
effort to not mark shards as stale during the cluster restart. However
this can be problematic if replicas failed to execute resync operations
but just fine in the subsequent write operations. When this happens,
replica will miss some operations from the new primary. There are some
implications if the local checkpoint on replica can't advance because of
the missing operations.

1. The global checkpoint won't advance - this causes both primary and 
replicas keep many index commits

2. Engine on replica won't flush periodically because uncommitted stats
is calculated based on the local checkpoint

3. Replica can use a large number of bitsets to keep track operations seqno

However we can prevent this issue but still reserve the best-effort by 
failing replicas which fail to execute resync operations but not mark
them as stale. We have prepared to the required infrastructure in #28049
and #28054 for this change.

Relates #24841
This commit is contained in:
Nhat Nguyen 2018-03-09 09:55:45 -08:00 committed by GitHub
parent d9cc6b9270
commit 4973887a10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 128 additions and 30 deletions

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -45,6 +46,7 @@ import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
@ -83,8 +85,7 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
@Override
protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) {
// We treat the resync as best-effort for now and don't mark unavailable shard copies as stale.
return new ReplicasProxy(primaryTerm);
return new ResyncActionReplicasProxy(primaryTerm);
}
@Override
@ -184,4 +185,22 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
});
}
/**
* A proxy for primary-replica resync operations which are performed on replicas when a new primary is promoted.
* Replica shards fail to execute resync operations will be failed but won't be marked as stale.
* This avoids marking shards as stale during cluster restart but enforces primary-replica resync mandatory.
*/
class ResyncActionReplicasProxy extends ReplicasProxy {
ResyncActionReplicasProxy(long primaryTerm) {
super(primaryTerm);
}
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
}
}

View File

@ -1172,6 +1172,30 @@ public abstract class TransportReplicationAction<
// "alive" if it were to be marked as stale.
onSuccess.run();
}
protected final ShardStateAction.Listener createShardActionListener(final Runnable onSuccess,
final Consumer<Exception> onPrimaryDemoted,
final Consumer<Exception> onIgnoredFailure) {
return new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onSuccess.run();
}
@Override
public void onFailure(Exception shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
onPrimaryDemoted.accept(shardFailedError);
} else {
// these can occur if the node is shutting down and are okay
// any other exception here is not expected and merits investigation
assert shardFailedError instanceof TransportException ||
shardFailedError instanceof NodeClosedException : shardFailedError;
onIgnoredFailure.accept(shardFailedError);
}
}
};
}
}
/**

View File

@ -384,41 +384,16 @@ public abstract class TransportWriteAction<
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
logger.warn((org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
private ShardStateAction.Listener createListener(final Runnable onSuccess, final Consumer<Exception> onPrimaryDemoted,
final Consumer<Exception> onIgnoredFailure) {
return new ShardStateAction.Listener() {
@Override
public void onSuccess() {
onSuccess.run();
}
@Override
public void onFailure(Exception shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
onPrimaryDemoted.accept(shardFailedError);
} else {
// these can occur if the node is shutting down and are okay
// any other exception here is not expected and merits investigation
assert shardFailedError instanceof TransportException ||
shardFailedError instanceof NodeClosedException : shardFailedError;
onIgnoredFailure.accept(shardFailedError);
}
}
};
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -31,6 +32,10 @@ import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
@ -43,15 +48,23 @@ import org.elasticsearch.test.transport.MockTransportService;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.not;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class PrimaryAllocationIT extends ESIntegTestCase {
@ -309,4 +322,71 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
assertEquals(1, client().admin().cluster().prepareState().get().getState()
.routingTable().index(indexName).shardsWithState(ShardRoutingState.STARTED).size());
}
/**
* This test asserts that replicas failed to execute resync operations will be failed but not marked as stale.
*/
public void testPrimaryReplicaResyncFailed() throws Exception {
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
final int numberOfReplicas = between(2, 3);
final String oldPrimary = internalCluster().startDataOnlyNode();
assertAcked(
prepareCreate("test", Settings.builder().put(indexSettings())
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)));
final ShardId shardId = new ShardId(clusterService().state().metaData().index("test").getIndex(), 0);
final Set<String> replicaNodes = new HashSet<>(internalCluster().startDataOnlyNodes(numberOfReplicas));
ensureGreen();
assertAcked(
client(master).admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get());
logger.info("--> Indexing with gap in seqno to ensure that some operations will be replayed in resync");
long numDocs = scaledRandomIntBetween(5, 50);
for (int i = 0; i < numDocs; i++) {
IndexResponse indexResult = index("test", "doc", Long.toString(i));
assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1));
}
final IndexShard oldPrimaryShard = internalCluster().getInstance(IndicesService.class, oldPrimary).getShardOrNull(shardId);
IndexShardTestCase.getEngine(oldPrimaryShard).getLocalCheckpointTracker().generateSeqNo(); // Make gap in seqno.
long moreDocs = scaledRandomIntBetween(1, 10);
for (int i = 0; i < moreDocs; i++) {
IndexResponse indexResult = index("test", "doc", Long.toString(numDocs + i));
assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1));
}
final Set<String> replicasSide1 = Sets.newHashSet(randomSubsetOf(between(1, numberOfReplicas - 1), replicaNodes));
final Set<String> replicasSide2 = Sets.difference(replicaNodes, replicasSide1);
NetworkDisruption partition = new NetworkDisruption(new TwoPartitions(replicasSide1, replicasSide2), new NetworkDisconnect());
internalCluster().setDisruptionScheme(partition);
logger.info("--> isolating some replicas during primary-replica resync");
partition.startDisrupting();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(oldPrimary));
// Checks that we fails replicas in one side but not mark them as stale.
assertBusy(() -> {
ClusterState state = client(master).admin().cluster().prepareState().get().getState();
final IndexShardRoutingTable shardRoutingTable = state.routingTable().shardRoutingTable(shardId);
final String newPrimaryNode = state.getRoutingNodes().node(shardRoutingTable.primary.currentNodeId()).node().getName();
assertThat(newPrimaryNode, not(equalTo(oldPrimary)));
Set<String> selectedPartition = replicasSide1.contains(newPrimaryNode) ? replicasSide1 : replicasSide2;
assertThat(shardRoutingTable.activeShards(), hasSize(selectedPartition.size()));
for (ShardRouting activeShard : shardRoutingTable.activeShards()) {
assertThat(state.getRoutingNodes().node(activeShard.currentNodeId()).node().getName(), isIn(selectedPartition));
}
assertThat(state.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1));
}, 1, TimeUnit.MINUTES);
assertAcked(
client(master).admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "all")).get());
partition.stopDisrupting();
logger.info("--> stop disrupting network and re-enable allocation");
assertBusy(() -> {
ClusterState state = client(master).admin().cluster().prepareState().get().getState();
assertThat(state.routingTable().shardRoutingTable(shardId).activeShards(), hasSize(numberOfReplicas));
assertThat(state.metaData().index("test").inSyncAllocationIds(shardId.id()), hasSize(numberOfReplicas + 1));
for (String node : replicaNodes) {
IndexShard shard = internalCluster().getInstance(IndicesService.class, node).getShardOrNull(shardId);
assertThat(shard.getLocalCheckpoint(), equalTo(numDocs + moreDocs));
}
});
}
}