This fixes missing to marking shard snapshots as failures when multiple data-nodes are lost during the snapshot process or shard snapshot failures have occured before a node left the cluster. The problem was that we were simply not adding any shard entries for completed shards on node-left events. This has no effect for a successful shard, but for a failed shard would lead to that shard not being marked as failed during snapshot finalization. Fixed by corectly keeping track of all previous completed shard states as well in this case. Also, added an assertion that without this fix would trip on almost every run of the resiliency tests and adjusted the serialization of SnapshotsInProgress.Entry so we have a proper assertion message. Closes #47550
This commit is contained in:
parent
f2d2ca21e2
commit
22679c7932
|
@ -25,6 +25,7 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState.Custom;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -76,7 +77,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
return builder.append("]").toString();
|
||||
}
|
||||
|
||||
public static class Entry {
|
||||
public static class Entry implements ToXContent {
|
||||
private final State state;
|
||||
private final Snapshot snapshot;
|
||||
private final boolean includeGlobalState;
|
||||
|
@ -210,7 +211,50 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return snapshot.toString();
|
||||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(REPOSITORY, snapshot.getRepository());
|
||||
builder.field(SNAPSHOT, snapshot.getSnapshotId().getName());
|
||||
builder.field(UUID, snapshot.getSnapshotId().getUUID());
|
||||
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState());
|
||||
builder.field(PARTIAL, partial);
|
||||
builder.field(STATE, state);
|
||||
builder.startArray(INDICES);
|
||||
{
|
||||
for (IndexId index : indices) {
|
||||
index.toXContent(builder, params);
|
||||
}
|
||||
}
|
||||
builder.endArray();
|
||||
builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(startTime));
|
||||
builder.field(REPOSITORY_STATE_ID, repositoryStateId);
|
||||
builder.startArray(SHARDS);
|
||||
{
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : shards) {
|
||||
ShardId shardId = shardEntry.key;
|
||||
ShardSnapshotStatus status = shardEntry.value;
|
||||
builder.startObject();
|
||||
{
|
||||
builder.field(INDEX, shardId.getIndex());
|
||||
builder.field(SHARD, shardId.getId());
|
||||
builder.field(STATE, status.state());
|
||||
builder.field(NODE, status.nodeId());
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFragment() {
|
||||
return false;
|
||||
}
|
||||
|
||||
private ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
|
||||
|
@ -507,48 +551,12 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.startArray(SNAPSHOTS);
|
||||
for (Entry entry : entries) {
|
||||
toXContent(entry, builder, params);
|
||||
entry.toXContent(builder, params);
|
||||
}
|
||||
builder.endArray();
|
||||
return builder;
|
||||
}
|
||||
|
||||
public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(REPOSITORY, entry.snapshot().getRepository());
|
||||
builder.field(SNAPSHOT, entry.snapshot().getSnapshotId().getName());
|
||||
builder.field(UUID, entry.snapshot().getSnapshotId().getUUID());
|
||||
builder.field(INCLUDE_GLOBAL_STATE, entry.includeGlobalState());
|
||||
builder.field(PARTIAL, entry.partial());
|
||||
builder.field(STATE, entry.state());
|
||||
builder.startArray(INDICES);
|
||||
{
|
||||
for (IndexId index : entry.indices()) {
|
||||
index.toXContent(builder, params);
|
||||
}
|
||||
}
|
||||
builder.endArray();
|
||||
builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(entry.startTime()));
|
||||
builder.field(REPOSITORY_STATE_ID, entry.getRepositoryStateId());
|
||||
builder.startArray(SHARDS);
|
||||
{
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : entry.shards) {
|
||||
ShardId shardId = shardEntry.key;
|
||||
ShardSnapshotStatus status = shardEntry.value;
|
||||
builder.startObject();
|
||||
{
|
||||
builder.field(INDEX, shardId.getIndex());
|
||||
builder.field(SHARD, shardId.getId());
|
||||
builder.field(STATE, status.state());
|
||||
builder.field(NODE, status.nodeId());
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
public enum ShardState {
|
||||
INIT((byte) 0, false, false),
|
||||
SUCCESS((byte) 2, true, false),
|
||||
|
|
|
@ -792,18 +792,21 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
|
||||
boolean snapshotChanged = false;
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshot.shards()) {
|
||||
ShardSnapshotStatus shardStatus = shardEntry.value;
|
||||
final ShardSnapshotStatus shardStatus = shardEntry.value;
|
||||
final ShardId shardId = shardEntry.key;
|
||||
if (!shardStatus.state().completed() && shardStatus.nodeId() != null) {
|
||||
if (nodes.nodeExists(shardStatus.nodeId())) {
|
||||
shards.put(shardEntry.key, shardEntry.value);
|
||||
shards.put(shardId, shardStatus);
|
||||
} else {
|
||||
// TODO: Restart snapshot on another node?
|
||||
snapshotChanged = true;
|
||||
logger.warn("failing snapshot of shard [{}] on closed node [{}]",
|
||||
shardEntry.key, shardStatus.nodeId());
|
||||
shards.put(shardEntry.key,
|
||||
shardId, shardStatus.nodeId());
|
||||
shards.put(shardId,
|
||||
new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown"));
|
||||
}
|
||||
} else {
|
||||
shards.put(shardId, shardStatus);
|
||||
}
|
||||
}
|
||||
if (snapshotChanged) {
|
||||
|
@ -835,6 +838,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
}, updatedSnapshot.getRepositoryStateId(), false);
|
||||
}
|
||||
assert updatedSnapshot.shards().size() == snapshot.shards().size()
|
||||
: "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]";
|
||||
}
|
||||
if (changed) {
|
||||
return ClusterState.builder(currentState)
|
||||
|
|
Loading…
Reference in New Issue