Use general cluster state batching mechanism for snapshot state updates (#22528)

Relates to #14899
This commit is contained in:
Yannick Welsch 2017-01-10 17:54:49 +01:00 committed by GitHub
parent c35277e623
commit 1cbb97d361
1 changed files with 50 additions and 79 deletions

View File

@ -27,13 +27,15 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -43,7 +45,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.engine.SnapshotFailedEngineException; import org.elasticsearch.index.engine.SnapshotFailedEngineException;
import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
@ -68,7 +69,6 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
@ -105,8 +105,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
private volatile Map<Snapshot, SnapshotShards> shardSnapshots = emptyMap(); private volatile Map<Snapshot, SnapshotShards> shardSnapshots = emptyMap();
private final BlockingQueue<UpdateIndexShardSnapshotStatusRequest> updatedSnapshotStateQueue = ConcurrentCollections.newBlockingQueue(); private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor();
@Inject @Inject
public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool, public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool,
@ -458,8 +457,6 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
private ShardId shardId; private ShardId shardId;
private ShardSnapshotStatus status; private ShardSnapshotStatus status;
private volatile boolean processed; // state field, no need to serialize
public UpdateIndexShardSnapshotStatusRequest() { public UpdateIndexShardSnapshotStatusRequest() {
} }
@ -502,14 +499,6 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
public String toString() { public String toString() {
return "" + snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; return "" + snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]";
} }
public void markAsProcessed() {
processed = true;
}
public boolean isProcessed() {
return processed;
}
} }
/** /**
@ -531,83 +520,65 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
*/ */
private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request) { private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request) {
logger.trace("received updated snapshot restore state [{}]", request); logger.trace("received updated snapshot restore state [{}]", request);
updatedSnapshotStateQueue.add(request); clusterService.submitStateUpdateTask(
"update snapshot state",
request,
ClusterStateTaskConfig.build(Priority.NORMAL),
snapshotStateExecutor,
(source, e) -> logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]",
request.snapshot(), request.shardId(), request.status()), e));
}
clusterService.submitStateUpdateTask("update snapshot state", new ClusterStateUpdateTask() { class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIndexShardSnapshotStatusRequest> {
private final List<UpdateIndexShardSnapshotStatusRequest> drainedRequests = new ArrayList<>();
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterTasksResult<UpdateIndexShardSnapshotStatusRequest> execute(ClusterState currentState, List<UpdateIndexShardSnapshotStatusRequest> tasks) throws Exception {
// The request was already processed as a part of an early batch - skipping final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (request.isProcessed()) { if (snapshots != null) {
return currentState; int changedCount = 0;
} final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
boolean updated = false;
updatedSnapshotStateQueue.drainTo(drainedRequests); for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) {
if (entry.snapshot().equals(updateSnapshotState.snapshot())) {
final int batchSize = drainedRequests.size(); logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state());
if (updated == false) {
// nothing to process (a previous event has processed it already) shards.putAll(entry.shards());
if (batchSize == 0) { updated = true;
return currentState;
}
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
int changedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
boolean updated = false;
for (int i = 0; i < batchSize; i++) {
final UpdateIndexShardSnapshotStatusRequest updateSnapshotState = drainedRequests.get(i);
updateSnapshotState.markAsProcessed();
if (entry.snapshot().equals(updateSnapshotState.snapshot())) {
logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state());
if (updated == false) {
shards.putAll(entry.shards());
updated = true;
}
shards.put(updateSnapshotState.shardId(), updateSnapshotState.status());
changedCount++;
} }
shards.put(updateSnapshotState.shardId(), updateSnapshotState.status());
changedCount++;
} }
}
if (updated) { if (updated) {
if (completed(shards.values()) == false) { if (completed(shards.values()) == false) {
entries.add(new SnapshotsInProgress.Entry(entry, shards.build())); entries.add(new SnapshotsInProgress.Entry(entry, shards.build()));
} else {
// Snapshot is finished - mark it as done
// TODO: Add PARTIAL_SUCCESS status?
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build());
entries.add(updatedEntry);
// Finalize snapshot in the repository
snapshotsService.endSnapshot(updatedEntry);
logger.info("snapshot [{}] is done", updatedEntry.snapshot());
}
} else { } else {
entries.add(entry); // Snapshot is finished - mark it as done
// TODO: Add PARTIAL_SUCCESS status?
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build());
entries.add(updatedEntry);
// Finalize snapshot in the repository
snapshotsService.endSnapshot(updatedEntry);
logger.info("snapshot [{}] is done", updatedEntry.snapshot());
} }
} } else {
if (changedCount > 0) { entries.add(entry);
logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount);
final SnapshotsInProgress updatedSnapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()]));
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build();
} }
} }
return currentState; if (changedCount > 0) {
} logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount);
@Override final SnapshotsInProgress updatedSnapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()]));
public void onFailure(String source, Exception e) { return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks).build(
for (UpdateIndexShardSnapshotStatusRequest request : drainedRequests) { ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build());
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]", request.snapshot(), request.shardId(), request.status()), e);
} }
} }
}); return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks).build(currentState);
}
} }
/** /**