Uses TransportMasterNodeAction to update shard snapshot status (#27165)
Currently, we are using a plain TransportRequestHandler to post snapshot status messages to the master. However, it doesn't have a robust retry mechanism as TransportMasterNodeAction. This change migrates from TransportRequestHandler to TransportMasterNodeAction for the new versions and keeps the current implementation for the old versions. Closes #27151
This commit is contained in:
parent
d92afa1e0a
commit
db688e1a17
|
@ -23,14 +23,24 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.logging.log4j.util.Supplier;
|
import org.apache.logging.log4j.util.Supplier;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
|
import org.elasticsearch.action.ActionResponse;
|
||||||
|
import org.elasticsearch.action.support.ActionFilters;
|
||||||
|
import org.elasticsearch.action.support.master.MasterNodeRequest;
|
||||||
|
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateApplier;
|
import org.elasticsearch.cluster.ClusterStateListener;
|
||||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||||
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
|
||||||
|
import org.elasticsearch.cluster.ClusterStateTaskListener;
|
||||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||||
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
|
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
|
||||||
import org.elasticsearch.cluster.SnapshotsInProgress.State;
|
import org.elasticsearch.cluster.SnapshotsInProgress.State;
|
||||||
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
|
@ -85,9 +95,11 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
|
||||||
* This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for
|
* This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for
|
||||||
* starting and stopping shard level snapshots
|
* starting and stopping shard level snapshots
|
||||||
*/
|
*/
|
||||||
public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateApplier, IndexEventListener {
|
public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {
|
||||||
|
|
||||||
|
public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6 = "internal:cluster/snapshot/update_snapshot";
|
||||||
|
public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status";
|
||||||
|
|
||||||
public static final String UPDATE_SNAPSHOT_ACTION_NAME = "internal:cluster/snapshot/update_snapshot";
|
|
||||||
|
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
|
|
||||||
|
@ -106,10 +118,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
||||||
private volatile Map<Snapshot, SnapshotShards> shardSnapshots = emptyMap();
|
private volatile Map<Snapshot, SnapshotShards> shardSnapshots = emptyMap();
|
||||||
|
|
||||||
private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor();
|
private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor();
|
||||||
|
private UpdateSnapshotStatusAction updateSnapshotStatusHandler;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool,
|
public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool,
|
||||||
TransportService transportService, IndicesService indicesService) {
|
TransportService transportService, IndicesService indicesService,
|
||||||
|
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.indicesService = indicesService;
|
this.indicesService = indicesService;
|
||||||
this.snapshotsService = snapshotsService;
|
this.snapshotsService = snapshotsService;
|
||||||
|
@ -118,20 +132,27 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
if (DiscoveryNode.isDataNode(settings)) {
|
if (DiscoveryNode.isDataNode(settings)) {
|
||||||
// this is only useful on the nodes that can hold data
|
// this is only useful on the nodes that can hold data
|
||||||
// addLowPriorityApplier to make sure that Repository will be created before snapshot
|
clusterService.addListener(this);
|
||||||
clusterService.addLowPriorityApplier(this);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The constructor of UpdateSnapshotStatusAction will register itself to the TransportService.
|
||||||
|
this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(settings, UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
|
||||||
|
transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver);
|
||||||
|
|
||||||
if (DiscoveryNode.isMasterNode(settings)) {
|
if (DiscoveryNode.isMasterNode(settings)) {
|
||||||
// This needs to run only on nodes that can become masters
|
// This needs to run only on nodes that can become masters
|
||||||
transportService.registerRequestHandler(UPDATE_SNAPSHOT_ACTION_NAME, UpdateIndexShardSnapshotStatusRequest::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandler());
|
transportService.registerRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, UpdateSnapshotStatusRequestV6::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandlerV6());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doStart() {
|
protected void doStart() {
|
||||||
|
assert this.updateSnapshotStatusHandler != null;
|
||||||
|
assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null;
|
||||||
|
if (DiscoveryNode.isMasterNode(settings)) {
|
||||||
|
assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6) != null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -151,11 +172,11 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doClose() {
|
protected void doClose() {
|
||||||
clusterService.removeApplier(this);
|
clusterService.removeListener(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void applyClusterState(ClusterChangedEvent event) {
|
public void clusterChanged(ClusterChangedEvent event) {
|
||||||
try {
|
try {
|
||||||
SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE);
|
SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE);
|
||||||
SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE);
|
SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE);
|
||||||
|
@ -449,7 +470,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
||||||
/**
|
/**
|
||||||
* Internal request that is used to send changes in snapshot status to master
|
* Internal request that is used to send changes in snapshot status to master
|
||||||
*/
|
*/
|
||||||
public static class UpdateIndexShardSnapshotStatusRequest extends TransportRequest {
|
public static class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest<UpdateIndexShardSnapshotStatusRequest> {
|
||||||
private Snapshot snapshot;
|
private Snapshot snapshot;
|
||||||
private ShardId shardId;
|
private ShardId shardId;
|
||||||
private ShardSnapshotStatus status;
|
private ShardSnapshotStatus status;
|
||||||
|
@ -462,6 +483,13 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
||||||
this.snapshot = snapshot;
|
this.snapshot = snapshot;
|
||||||
this.shardId = shardId;
|
this.shardId = shardId;
|
||||||
this.status = status;
|
this.status = status;
|
||||||
|
// By default, we keep trying to post snapshot status messages to avoid snapshot processes getting stuck.
|
||||||
|
this.masterNodeTimeout = TimeValue.timeValueNanos(Long.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ActionRequestValidationException validate() {
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -502,11 +530,16 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
||||||
* Updates the shard status
|
* Updates the shard status
|
||||||
*/
|
*/
|
||||||
public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) {
|
public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) {
|
||||||
UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status);
|
|
||||||
try {
|
try {
|
||||||
transportService.sendRequest(master, UPDATE_SNAPSHOT_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
|
if (master.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||||
|
UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status);
|
||||||
|
transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||||
|
} else {
|
||||||
|
UpdateSnapshotStatusRequestV6 requestV6 = new UpdateSnapshotStatusRequestV6(snapshot, shardId, status);
|
||||||
|
transportService.sendRequest(master, UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, requestV6, EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", request.snapshot(), request.status()), e);
|
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -515,15 +548,24 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
||||||
*
|
*
|
||||||
* @param request update shard status request
|
* @param request update shard status request
|
||||||
*/
|
*/
|
||||||
private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request) {
|
private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
|
||||||
logger.trace("received updated snapshot restore state [{}]", request);
|
logger.trace("received updated snapshot restore state [{}]", request);
|
||||||
clusterService.submitStateUpdateTask(
|
clusterService.submitStateUpdateTask(
|
||||||
"update snapshot state",
|
"update snapshot state",
|
||||||
request,
|
request,
|
||||||
ClusterStateTaskConfig.build(Priority.NORMAL),
|
ClusterStateTaskConfig.build(Priority.NORMAL),
|
||||||
snapshotStateExecutor,
|
snapshotStateExecutor,
|
||||||
(source, e) -> logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]",
|
new ClusterStateTaskListener() {
|
||||||
request.snapshot(), request.shardId(), request.status()), e));
|
@Override
|
||||||
|
public void onFailure(String source, Exception e) {
|
||||||
|
listener.onFailure(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
|
listener.onResponse(new UpdateIndexShardSnapshotStatusResponse());
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIndexShardSnapshotStatusRequest> {
|
class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIndexShardSnapshotStatusRequest> {
|
||||||
|
@ -578,13 +620,107 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse {
|
||||||
* Transport request handler that is used to send changes in snapshot status to master
|
|
||||||
*/
|
}
|
||||||
class UpdateSnapshotStateRequestHandler implements TransportRequestHandler<UpdateIndexShardSnapshotStatusRequest> {
|
|
||||||
|
class UpdateSnapshotStatusAction extends TransportMasterNodeAction<UpdateIndexShardSnapshotStatusRequest, UpdateIndexShardSnapshotStatusResponse> {
|
||||||
|
UpdateSnapshotStatusAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
|
||||||
|
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||||
|
super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, UpdateIndexShardSnapshotStatusRequest::new);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void messageReceived(UpdateIndexShardSnapshotStatusRequest request, final TransportChannel channel) throws Exception {
|
protected String executor() {
|
||||||
innerUpdateSnapshotState(request);
|
return ThreadPool.Names.SAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected UpdateIndexShardSnapshotStatusResponse newResponse() {
|
||||||
|
return new UpdateIndexShardSnapshotStatusResponse();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) throws Exception {
|
||||||
|
innerUpdateSnapshotState(request, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A BWC version of {@link UpdateIndexShardSnapshotStatusRequest}
|
||||||
|
*/
|
||||||
|
static class UpdateSnapshotStatusRequestV6 extends TransportRequest {
|
||||||
|
private Snapshot snapshot;
|
||||||
|
private ShardId shardId;
|
||||||
|
private ShardSnapshotStatus status;
|
||||||
|
|
||||||
|
UpdateSnapshotStatusRequestV6() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
UpdateSnapshotStatusRequestV6(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) {
|
||||||
|
this.snapshot = snapshot;
|
||||||
|
this.shardId = shardId;
|
||||||
|
this.status = status;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
|
super.readFrom(in);
|
||||||
|
snapshot = new Snapshot(in);
|
||||||
|
shardId = ShardId.readShardId(in);
|
||||||
|
status = new ShardSnapshotStatus(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
|
super.writeTo(out);
|
||||||
|
snapshot.writeTo(out);
|
||||||
|
shardId.writeTo(out);
|
||||||
|
status.writeTo(out);
|
||||||
|
}
|
||||||
|
|
||||||
|
Snapshot snapshot() {
|
||||||
|
return snapshot;
|
||||||
|
}
|
||||||
|
|
||||||
|
ShardId shardId() {
|
||||||
|
return shardId;
|
||||||
|
}
|
||||||
|
|
||||||
|
ShardSnapshotStatus status() {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A BWC version of {@link UpdateSnapshotStatusAction}
|
||||||
|
*/
|
||||||
|
class UpdateSnapshotStateRequestHandlerV6 implements TransportRequestHandler<UpdateSnapshotStatusRequestV6> {
|
||||||
|
@Override
|
||||||
|
public void messageReceived(UpdateSnapshotStatusRequestV6 requestV6, final TransportChannel channel) throws Exception {
|
||||||
|
final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(requestV6.snapshot(), requestV6.shardId(), requestV6.status());
|
||||||
|
innerUpdateSnapshotState(request, new ActionListener<UpdateIndexShardSnapshotStatusResponse>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(UpdateIndexShardSnapshotStatusResponse updateSnapshotStatusResponse) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
logger.warn("Failed to update snapshot status", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,115 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.snapshots;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||||
|
import org.elasticsearch.plugins.Plugin;
|
||||||
|
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
||||||
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||||
|
import org.elasticsearch.test.transport.MockTransportService;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.everyItem;
|
||||||
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
|
|
||||||
|
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
|
||||||
|
public class SnapshotShardsServiceIT extends AbstractSnapshotIntegTestCase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||||
|
return Arrays.asList(MockRepository.Plugin.class, MockTransportService.TestPlugin.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRetryPostingSnapshotStatusMessages() throws Exception {
|
||||||
|
String masterNode = internalCluster().startMasterOnlyNode();
|
||||||
|
String dataNode = internalCluster().startDataOnlyNode();
|
||||||
|
|
||||||
|
logger.info("--> creating repository");
|
||||||
|
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
||||||
|
.setType("mock").setSettings(Settings.builder()
|
||||||
|
.put("location", randomRepoPath())
|
||||||
|
.put("compress", randomBoolean())
|
||||||
|
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||||
|
|
||||||
|
final int shards = between(1, 10);
|
||||||
|
assertAcked(prepareCreate("test-index", 0, Settings.builder().put("number_of_shards", shards).put("number_of_replicas", 0)));
|
||||||
|
ensureGreen();
|
||||||
|
final int numDocs = scaledRandomIntBetween(50, 100);
|
||||||
|
for (int i = 0; i < numDocs; i++) {
|
||||||
|
index("test-index", "doc", Integer.toString(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("--> blocking repository");
|
||||||
|
String blockedNode = blockNodeWithIndex("test-repo", "test-index");
|
||||||
|
dataNodeClient().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||||
|
.setWaitForCompletion(false)
|
||||||
|
.setIndices("test-index")
|
||||||
|
.get();
|
||||||
|
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
|
||||||
|
|
||||||
|
final SnapshotId snapshotId = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap")
|
||||||
|
.get().getSnapshots().get(0).snapshotId();
|
||||||
|
|
||||||
|
logger.info("--> start disrupting cluster");
|
||||||
|
final NetworkDisruption networkDisruption = new NetworkDisruption(new NetworkDisruption.TwoPartitions(masterNode, dataNode),
|
||||||
|
NetworkDisruption.NetworkDelay.random(random()));
|
||||||
|
internalCluster().setDisruptionScheme(networkDisruption);
|
||||||
|
networkDisruption.startDisrupting();
|
||||||
|
|
||||||
|
logger.info("--> unblocking repository");
|
||||||
|
unblockNode("test-repo", blockedNode);
|
||||||
|
|
||||||
|
// Retrieve snapshot status from the data node.
|
||||||
|
SnapshotShardsService snapshotShardsService = internalCluster().getInstance(SnapshotShardsService.class, blockedNode);
|
||||||
|
assertBusy(() -> {
|
||||||
|
final Snapshot snapshot = new Snapshot("test-repo", snapshotId);
|
||||||
|
List<IndexShardSnapshotStatus.Stage> stages = snapshotShardsService.currentSnapshotShards(snapshot)
|
||||||
|
.values().stream().map(IndexShardSnapshotStatus::stage).collect(Collectors.toList());
|
||||||
|
assertThat(stages, hasSize(shards));
|
||||||
|
assertThat(stages, everyItem(equalTo(IndexShardSnapshotStatus.Stage.DONE)));
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.info("--> stop disrupting cluster");
|
||||||
|
networkDisruption.stopDisrupting();
|
||||||
|
internalCluster().clearDisruptionScheme(true);
|
||||||
|
|
||||||
|
assertBusy(() -> {
|
||||||
|
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
|
||||||
|
.prepareGetSnapshots("test-repo")
|
||||||
|
.setSnapshots("test-snap").get();
|
||||||
|
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
|
||||||
|
logger.info("Snapshot status [{}], successfulShards [{}]", snapshotInfo.state(), snapshotInfo.successfulShards());
|
||||||
|
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
|
||||||
|
assertThat(snapshotInfo.successfulShards(), equalTo(shards));
|
||||||
|
}, 10, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
|
@ -57,6 +57,11 @@ for (Version version : wireCompatVersions) {
|
||||||
if (project.bwc_tests_enabled) {
|
if (project.bwc_tests_enabled) {
|
||||||
bwcTest.dependsOn(versionBwcTest)
|
bwcTest.dependsOn(versionBwcTest)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* To support taking index snapshots, we have to set path.repo setting */
|
||||||
|
tasks.getByName("${baseName}#mixedClusterTestRunner").configure {
|
||||||
|
systemProperty 'tests.path.repo', new File(buildDir, "cluster/shared/repo")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test.enabled = false // no unit tests for rolling upgrades, only the rest integration test
|
test.enabled = false // no unit tests for rolling upgrades, only the rest integration test
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.client.RestClient;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||||
import org.elasticsearch.test.rest.yaml.ObjectPath;
|
import org.elasticsearch.test.rest.yaml.ObjectPath;
|
||||||
|
@ -42,7 +43,9 @@ import java.util.stream.Collectors;
|
||||||
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength;
|
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength;
|
||||||
import static java.util.Collections.emptyMap;
|
import static java.util.Collections.emptyMap;
|
||||||
import static java.util.Collections.singletonMap;
|
import static java.util.Collections.singletonMap;
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.not;
|
||||||
|
|
||||||
public class IndexingIT extends ESRestTestCase {
|
public class IndexingIT extends ESRestTestCase {
|
||||||
|
|
||||||
|
@ -237,6 +240,57 @@ public class IndexingIT extends ESRestTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testUpdateSnapshotStatus() throws Exception {
|
||||||
|
Nodes nodes = buildNodeAndVersions();
|
||||||
|
assertThat(nodes.getNewNodes(), not(empty()));
|
||||||
|
logger.info("cluster discovered: {}", nodes.toString());
|
||||||
|
|
||||||
|
// Create the repository before taking the snapshot.
|
||||||
|
String repoConfig = JsonXContent.contentBuilder()
|
||||||
|
.startObject()
|
||||||
|
.field("type", "fs")
|
||||||
|
.startObject("settings")
|
||||||
|
.field("compress", randomBoolean())
|
||||||
|
.field("location", System.getProperty("tests.path.repo"))
|
||||||
|
.endObject()
|
||||||
|
.endObject()
|
||||||
|
.string();
|
||||||
|
|
||||||
|
assertOK(
|
||||||
|
client().performRequest("PUT", "/_snapshot/repo", emptyMap(),
|
||||||
|
new StringEntity(repoConfig, ContentType.APPLICATION_JSON))
|
||||||
|
);
|
||||||
|
|
||||||
|
String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(","));
|
||||||
|
|
||||||
|
// Allocating shards on the BWC nodes to makes sure that taking snapshot happens on those nodes.
|
||||||
|
Settings.Builder settings = Settings.builder()
|
||||||
|
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(5, 10))
|
||||||
|
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
|
||||||
|
.put("index.routing.allocation.include._name", bwcNames);
|
||||||
|
|
||||||
|
final String index = "test-snapshot-index";
|
||||||
|
createIndex(index, settings.build());
|
||||||
|
indexDocs(index, 0, between(50, 100));
|
||||||
|
ensureGreen();
|
||||||
|
assertOK(client().performRequest("POST", index + "/_refresh"));
|
||||||
|
|
||||||
|
assertOK(
|
||||||
|
client().performRequest("PUT", "/_snapshot/repo/bwc-snapshot", singletonMap("wait_for_completion", "true"),
|
||||||
|
new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON))
|
||||||
|
);
|
||||||
|
|
||||||
|
// Allocating shards on all nodes, taking snapshots should happen on all nodes.
|
||||||
|
updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name"));
|
||||||
|
ensureGreen();
|
||||||
|
assertOK(client().performRequest("POST", index + "/_refresh"));
|
||||||
|
|
||||||
|
assertOK(
|
||||||
|
client().performRequest("PUT", "/_snapshot/repo/mixed-snapshot", singletonMap("wait_for_completion", "true"),
|
||||||
|
new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
|
private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
|
||||||
final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference));
|
final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference));
|
||||||
assertOK(response);
|
assertOK(response);
|
||||||
|
|
Loading…
Reference in New Issue