Fix Snapshot Abort Not Waiting for Data Nodes (#58214) (#58228)

This was a really subtle bug that we introduced a long time ago.
If a shard snapshot is in aborted state but hasn't started snapshotting on a node
we can only send the failed notification for it if the shard was actually supposed
to execute on the local node.
Without this fix, if shard snapshots were spread out across at least two data nodes
(so that each data node does not have all the primaries) the abort would actually
never wait on the data nodes. This isn't a big deal with uuid shard generations
but could lead to potential corruption on S3 when using numeric shard generations
(albeit very unlikely now that we have the 3 minute wait there).
Another negative side-effect of this bug was that master would receive a lot more
shard status update messages for aborted shards since each data node not assigned
a primary would send one message for that primary.
This commit is contained in:
Armin Braun 2020-06-17 11:39:50 +02:00 committed by GitHub
parent 9894d90e0b
commit 85be78b624
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 6 deletions

View File

@ -33,11 +33,14 @@ import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRe
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.SnapshotsInProgress;
@ -45,6 +48,7 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedFunction;
@ -85,6 +89,10 @@ import org.elasticsearch.test.TestCustomMetadata;
import org.elasticsearch.test.disruption.BusyMasterServiceDisruption;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.io.UncheckedIOException;
@ -102,6 +110,7 @@ import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@ -1355,6 +1364,62 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
assertFalse(restoredRetentionLeases.toString() + " has no " + leaseId, restoredRetentionLeases.contains(leaseId));
}
public void testAbortWaitsOnDataNode() throws Exception {
internalCluster().startMasterOnlyNode();
final String dataNodeName = internalCluster().startDataOnlyNode();
final String indexName = "test-index";
createIndex(indexName);
index(indexName, "_doc", "some_id", "foo", "bar");
final String otherDataNode = internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock", randomRepoPath());
blockAllDataNodes(repoName);
final String snapshotName = "test-snap";
final ActionFuture<CreateSnapshotResponse> snapshotResponse =
client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute();
waitForBlock(dataNodeName, repoName, TimeValue.timeValueSeconds(30L));
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, otherDataNode);
final PlainActionFuture<Void> abortVisibleFuture = PlainActionFuture.newFuture();
clusterService.addListener(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null && snapshotsInProgress.entries().stream()
.anyMatch(entry -> entry.state() == SnapshotsInProgress.State.ABORTED))
abortVisibleFuture.onResponse(null);
clusterService.removeListener(this);
}
});
final AtomicBoolean blocked = new AtomicBoolean(true);
final TransportService transportService = internalCluster().getInstance(TransportService.class, otherDataNode);
transportService.addMessageListener(new TransportMessageListener() {
@Override
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {
if (blocked.get() && action.equals(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME)) {
throw new AssertionError("Node had no assigned shard snapshots so it shouldn't send out shard state updates");
}
}
});
logger.info("--> abort snapshot");
final ActionFuture<AcknowledgedResponse> deleteResponse =
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute();
abortVisibleFuture.get(30L, TimeUnit.SECONDS);
assertFalse("delete should not be able to finish until data node is unblocked", deleteResponse.isDone());
blocked.set(false);
unblockAllDataNodes(repoName);
assertAcked(deleteResponse.get());
assertThat(snapshotResponse.get().getSnapshotInfo().state(), is(SnapshotState.FAILED));
}
private long calculateTotalFilesSize(List<Path> files) {
return files.stream().mapToLong(f -> {
try {

View File

@ -94,8 +94,6 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {
private static final Logger logger = LogManager.getLogger(SnapshotShardsService.class);
private static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status";
private final ClusterService clusterService;
private final IndicesService indicesService;
@ -136,7 +134,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
@Override
protected void doStart() {
assert this.updateSnapshotStatusHandler != null;
assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null;
assert transportService.getRequestHandler(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null;
}
@Override
@ -262,7 +260,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
if (snapshotStatus == null) {
// due to CS batching we might have missed the INIT state and straight went into ABORTED
// notify master that abort has completed by moving to FAILED
if (shard.value.state() == ShardState.ABORTED) {
if (shard.value.state() == ShardState.ABORTED && localNodeId.equals(shard.value.nodeId())) {
notifyFailedSnapshotShard(snapshot, shard.key, shard.value.reason());
}
} else {
@ -549,7 +547,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
() -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e);
}
},
(req, reqListener) -> transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, req,
(req, reqListener) -> transportService.sendRequest(transportService.getLocalNode(),
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, req,
new TransportResponseHandler<UpdateIndexShardSnapshotStatusResponse>() {
@Override
public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
@ -667,7 +666,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(
SnapshotShardsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, false, transportService, clusterService, threadPool,
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, false, transportService, clusterService, threadPool,
actionFilters, UpdateIndexShardSnapshotStatusRequest::new, indexNameExpressionResolver
);
}

View File

@ -118,6 +118,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
private static final Logger logger = LogManager.getLogger(SnapshotsService.class);
public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status";
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;