Minor bugs/inconsistencies: If a shard hasn't changed at all we were reporting `0` for total size and total file count while it was ongoing. If a data node restarts/drops out during snapshot creation the fallback logic did not load the correct statistic from the repository but just created a status with `0` counts from the snapshot state in the CS. Added a fallback to reading from the repository in this case.
This commit is contained in:
parent
c613e0915a
commit
090211f768
|
@ -22,10 +22,14 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
|
@ -185,4 +189,112 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
|
|||
assertThat(found.snapshotId(), is(snapshotInfo.snapshotId()));
|
||||
assertThat(found.state(), is(SnapshotState.SUCCESS));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the following sequence of steps:
|
||||
* 1. Start snapshot of two shards (both located on separate data nodes).
|
||||
* 2. Have one of the shards snapshot completely and the other block
|
||||
* 3. Restart the data node that completed its shard snapshot
|
||||
* 4. Make sure that snapshot status APIs show correct file-counts and -sizes
|
||||
*
|
||||
* @throws Exception on failure
|
||||
*/
|
||||
public void testCorrectCountsForDoneShards() throws Exception {
|
||||
final String indexOne = "index-1";
|
||||
final String indexTwo = "index-2";
|
||||
final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
|
||||
final String dataNodeOne = dataNodes.get(0);
|
||||
final String dataNodeTwo = dataNodes.get(1);
|
||||
|
||||
createIndex(indexOne, singleShardOneNode(dataNodeOne));
|
||||
index(indexOne, "_doc", "some_doc_id", "foo", "bar");
|
||||
createIndex(indexTwo, singleShardOneNode(dataNodeTwo));
|
||||
index(indexTwo, "_doc", "some_doc_id", "foo", "bar");
|
||||
|
||||
final String repoName = "test-repo";
|
||||
createRepository(repoName, "mock", randomRepoPath());
|
||||
|
||||
blockDataNode(repoName, dataNodeOne);
|
||||
|
||||
final String snapshotOne = "snap-1";
|
||||
// restarting a data node below so using a master client here
|
||||
final ActionFuture<CreateSnapshotResponse> responseSnapshotOne = internalCluster().masterClient().admin()
|
||||
.cluster().prepareCreateSnapshot(repoName, snapshotOne).setWaitForCompletion(true).execute();
|
||||
|
||||
assertBusy(() -> {
|
||||
final SnapshotStatus snapshotStatusOne = getSnapshotStatus(repoName, snapshotOne);
|
||||
assertThat(snapshotStatusOne.getState(), is(SnapshotsInProgress.State.STARTED));
|
||||
final SnapshotIndexShardStatus snapshotShardState = stateFirstShard(snapshotStatusOne, indexTwo);
|
||||
assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
|
||||
assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0));
|
||||
assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L));
|
||||
}, 30L, TimeUnit.SECONDS);
|
||||
|
||||
final SnapshotStats snapshotShardStats =
|
||||
stateFirstShard(getSnapshotStatus(repoName, snapshotOne), indexTwo).getStats();
|
||||
final int totalFiles = snapshotShardStats.getTotalFileCount();
|
||||
final long totalFileSize = snapshotShardStats.getTotalSize();
|
||||
|
||||
internalCluster().restartNode(dataNodeTwo);
|
||||
|
||||
final SnapshotIndexShardStatus snapshotShardStateAfterNodeRestart =
|
||||
stateFirstShard(getSnapshotStatus(repoName, snapshotOne), indexTwo);
|
||||
assertThat(snapshotShardStateAfterNodeRestart.getStage(), is(SnapshotIndexShardStage.DONE));
|
||||
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalFileCount(), equalTo(totalFiles));
|
||||
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalSize(), equalTo(totalFileSize));
|
||||
|
||||
unblockAllDataNodes(repoName);
|
||||
assertThat(responseSnapshotOne.get().getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
|
||||
|
||||
// indexing another document to the second index so it will do writes during the snapshot and we can block on those writes
|
||||
index(indexTwo, "_doc", "some_other_doc_id", "foo", "other_bar");
|
||||
|
||||
blockDataNode(repoName, dataNodeTwo);
|
||||
|
||||
final String snapshotTwo = "snap-2";
|
||||
final ActionFuture<CreateSnapshotResponse> responseSnapshotTwo =
|
||||
client().admin().cluster().prepareCreateSnapshot(repoName, snapshotTwo).setWaitForCompletion(true).execute();
|
||||
|
||||
waitForBlock(dataNodeTwo, repoName, TimeValue.timeValueSeconds(30L));
|
||||
|
||||
assertBusy(() -> {
|
||||
final SnapshotStatus snapshotStatusOne = getSnapshotStatus(repoName, snapshotOne);
|
||||
final SnapshotStatus snapshotStatusTwo = getSnapshotStatus(repoName, snapshotTwo);
|
||||
final SnapshotIndexShardStatus snapshotShardStateOne = stateFirstShard(snapshotStatusOne, indexOne);
|
||||
final SnapshotIndexShardStatus snapshotShardStateTwo = stateFirstShard(snapshotStatusTwo, indexOne);
|
||||
assertThat(snapshotShardStateOne.getStage(), is(SnapshotIndexShardStage.DONE));
|
||||
assertThat(snapshotShardStateTwo.getStage(), is(SnapshotIndexShardStage.DONE));
|
||||
final int totalFilesShardOne = snapshotShardStateOne.getStats().getTotalFileCount();
|
||||
final long totalSizeShardOne = snapshotShardStateOne.getStats().getTotalSize();
|
||||
assertThat(totalFilesShardOne, greaterThan(0));
|
||||
assertThat(totalSizeShardOne, greaterThan(0L));
|
||||
assertThat(totalFilesShardOne, equalTo(snapshotShardStateTwo.getStats().getTotalFileCount()));
|
||||
assertThat(totalSizeShardOne, equalTo(snapshotShardStateTwo.getStats().getTotalSize()));
|
||||
assertThat(snapshotShardStateTwo.getStats().getIncrementalFileCount(), equalTo(0));
|
||||
assertThat(snapshotShardStateTwo.getStats().getIncrementalSize(), equalTo(0L));
|
||||
}, 30L, TimeUnit.SECONDS);
|
||||
|
||||
unblockAllDataNodes(repoName);
|
||||
assertThat(responseSnapshotTwo.get().getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
|
||||
}
|
||||
|
||||
private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) {
|
||||
return snapshotStatus.getIndices().get(indexName).getShards().get(0);
|
||||
}
|
||||
|
||||
private static SnapshotStatus getSnapshotStatus(String repoName, String snapshotName) {
|
||||
try {
|
||||
return client().admin().cluster().prepareSnapshotStatus(repoName).setSnapshots(snapshotName)
|
||||
.get().getSnapshots().get(0);
|
||||
} catch (SnapshotMissingException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static Settings singleShardOneNode(String node) {
|
||||
return Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put("index.routing.allocation.include._name", node).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -164,6 +164,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
for (SnapshotsInProgress.Entry entry : currentSnapshotEntries) {
|
||||
currentSnapshotNames.add(entry.snapshot().getSnapshotId().getName());
|
||||
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
|
||||
Map<String, IndexId> indexIdLookup = null;
|
||||
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shardEntry : entry.shards()) {
|
||||
SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.value;
|
||||
if (status.nodeId() != null) {
|
||||
|
@ -181,6 +182,10 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
}
|
||||
}
|
||||
}
|
||||
// We failed to find the status of the shard from the responses we received from data nodes.
|
||||
// This can happen if nodes drop out of the cluster completely or restart during the snapshot.
|
||||
// We rebuild the information they would have provided from their in memory state from the cluster
|
||||
// state and the repository contents in the below logic
|
||||
final SnapshotIndexShardStage stage;
|
||||
switch (shardEntry.value.state()) {
|
||||
case FAILED:
|
||||
|
@ -198,7 +203,20 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
default:
|
||||
throw new IllegalArgumentException("Unknown snapshot state " + shardEntry.value.state());
|
||||
}
|
||||
SnapshotIndexShardStatus shardStatus = new SnapshotIndexShardStatus(shardEntry.key, stage);
|
||||
final SnapshotIndexShardStatus shardStatus;
|
||||
if (stage == SnapshotIndexShardStage.DONE) {
|
||||
// Shard snapshot completed successfully so we should be able to load the exact statistics for this
|
||||
// shard from the repository already.
|
||||
if (indexIdLookup == null) {
|
||||
indexIdLookup = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()));
|
||||
}
|
||||
final ShardId shardId = shardEntry.key;
|
||||
shardStatus = new SnapshotIndexShardStatus(shardId, repositoriesService.repository(entry.repository())
|
||||
.getShardSnapshotStatus(entry.snapshot().getSnapshotId(), indexIdLookup.get(shardId.getIndexName()),
|
||||
shardId).asCopy());
|
||||
} else {
|
||||
shardStatus = new SnapshotIndexShardStatus(shardEntry.key, stage);
|
||||
}
|
||||
shardStatusBuilder.add(shardStatus);
|
||||
}
|
||||
builder.add(new SnapshotStatus(entry.snapshot(), entry.state(),
|
||||
|
|
|
@ -1762,6 +1762,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
} else {
|
||||
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesFromSegmentInfos) {
|
||||
indexTotalNumberOfFiles++;
|
||||
indexTotalFileSize += fileInfo.length();
|
||||
}
|
||||
indexCommitPointFiles = filesFromSegmentInfos;
|
||||
}
|
||||
|
||||
|
|
|
@ -1799,6 +1799,13 @@ public final class InternalTestCluster extends TestCluster {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Restarts a node.
|
||||
*/
|
||||
public void restartNode(String nodeName) throws Exception {
|
||||
restartNode(nodeName, EMPTY_CALLBACK);
|
||||
}
|
||||
|
||||
/**
|
||||
* Restarts a node and calls the callback during restart.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue