From db688e1a174b8a732b999426048473437411b478 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 17 Nov 2017 11:54:44 -0500 Subject: [PATCH] Uses TransportMasterNodeAction to update shard snapshot status (#27165) Currently, we are using a plain TransportRequestHandler to post snapshot status messages to the master. However, it doesn't have a robust retry mechanism as TransportMasterNodeAction. This change migrates from TransportRequestHandler to TransportMasterNodeAction for the new versions and keeps the current implementation for the old versions. Closes #27151 --- .../snapshots/SnapshotShardsService.java | 182 +++++++++++++++--- .../snapshots/SnapshotShardsServiceIT.java | 115 +++++++++++ qa/mixed-cluster/build.gradle | 5 + .../elasticsearch/backwards/IndexingIT.java | 54 ++++++ 4 files changed, 333 insertions(+), 23 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 259136ca9cc..f8a601cc41f 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -23,14 +23,24 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.State; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -85,9 +95,11 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed; * This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for * starting and stopping shard level snapshots */ -public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateApplier, IndexEventListener { +public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { + + public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6 = "internal:cluster/snapshot/update_snapshot"; + public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status"; - public static final String UPDATE_SNAPSHOT_ACTION_NAME = "internal:cluster/snapshot/update_snapshot"; private final ClusterService clusterService; @@ -106,10 +118,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private volatile Map shardSnapshots = emptyMap(); private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); + private UpdateSnapshotStatusAction updateSnapshotStatusHandler; @Inject public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool, - TransportService transportService, IndicesService indicesService) { + TransportService transportService, IndicesService indicesService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings); this.indicesService = indicesService; this.snapshotsService = snapshotsService; @@ -118,20 +132,27 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements this.threadPool = threadPool; if (DiscoveryNode.isDataNode(settings)) { // this is only useful on the nodes that can hold data - // addLowPriorityApplier to make sure that Repository will be created before snapshot - clusterService.addLowPriorityApplier(this); + clusterService.addListener(this); } + // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. + this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(settings, UPDATE_SNAPSHOT_STATUS_ACTION_NAME, + transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver); + if (DiscoveryNode.isMasterNode(settings)) { // This needs to run only on nodes that can become masters - transportService.registerRequestHandler(UPDATE_SNAPSHOT_ACTION_NAME, UpdateIndexShardSnapshotStatusRequest::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandler()); + transportService.registerRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, UpdateSnapshotStatusRequestV6::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandlerV6()); } } @Override protected void doStart() { - + assert this.updateSnapshotStatusHandler != null; + assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null; + if (DiscoveryNode.isMasterNode(settings)) { + assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6) != null; + } } @Override @@ -151,11 +172,11 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements @Override protected void doClose() { - clusterService.removeApplier(this); + clusterService.removeListener(this); } @Override - public void applyClusterState(ClusterChangedEvent event) { + public void clusterChanged(ClusterChangedEvent event) { try { SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE); SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE); @@ -449,7 +470,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements /** * Internal request that is used to send changes in snapshot status to master */ - public static class UpdateIndexShardSnapshotStatusRequest extends TransportRequest { + public static class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest { private Snapshot snapshot; private ShardId shardId; private ShardSnapshotStatus status; @@ -462,6 +483,13 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements this.snapshot = snapshot; this.shardId = shardId; this.status = status; + // By default, we keep trying to post snapshot status messages to avoid snapshot processes getting stuck. + this.masterNodeTimeout = TimeValue.timeValueNanos(Long.MAX_VALUE); + } + + @Override + public ActionRequestValidationException validate() { + return null; } @Override @@ -502,11 +530,16 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements * Updates the shard status */ public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode master) { - UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); try { - transportService.sendRequest(master, UPDATE_SNAPSHOT_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); + if (master.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); + transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); + } else { + UpdateSnapshotStatusRequestV6 requestV6 = new UpdateSnapshotStatusRequestV6(snapshot, shardId, status); + transportService.sendRequest(master, UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, requestV6, EmptyTransportResponseHandler.INSTANCE_SAME); + } } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", request.snapshot(), request.status()), e); + logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e); } } @@ -515,15 +548,24 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements * * @param request update shard status request */ - private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request) { + private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, ActionListener listener) { logger.trace("received updated snapshot restore state [{}]", request); clusterService.submitStateUpdateTask( "update snapshot state", request, ClusterStateTaskConfig.build(Priority.NORMAL), snapshotStateExecutor, - (source, e) -> logger.warn((Supplier) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]", - request.snapshot(), request.shardId(), request.status()), e)); + new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new UpdateIndexShardSnapshotStatusResponse()); + } + }); } class SnapshotStateExecutor implements ClusterStateTaskExecutor { @@ -578,13 +620,107 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements } } - /** - * Transport request handler that is used to send changes in snapshot status to master - */ - class UpdateSnapshotStateRequestHandler implements TransportRequestHandler { + static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse { + + } + + class UpdateSnapshotStatusAction extends TransportMasterNodeAction { + UpdateSnapshotStatusAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, UpdateIndexShardSnapshotStatusRequest::new); + } + @Override - public void messageReceived(UpdateIndexShardSnapshotStatusRequest request, final TransportChannel channel) throws Exception { - innerUpdateSnapshotState(request); + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected UpdateIndexShardSnapshotStatusResponse newResponse() { + return new UpdateIndexShardSnapshotStatusResponse(); + } + + @Override + protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, ActionListener listener) throws Exception { + innerUpdateSnapshotState(request, listener); + } + + @Override + protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) { + return null; + } + } + + /** + * A BWC version of {@link UpdateIndexShardSnapshotStatusRequest} + */ + static class UpdateSnapshotStatusRequestV6 extends TransportRequest { + private Snapshot snapshot; + private ShardId shardId; + private ShardSnapshotStatus status; + + UpdateSnapshotStatusRequestV6() { + + } + + UpdateSnapshotStatusRequestV6(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { + this.snapshot = snapshot; + this.shardId = shardId; + this.status = status; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + snapshot = new Snapshot(in); + shardId = ShardId.readShardId(in); + status = new ShardSnapshotStatus(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + snapshot.writeTo(out); + shardId.writeTo(out); + status.writeTo(out); + } + + Snapshot snapshot() { + return snapshot; + } + + ShardId shardId() { + return shardId; + } + + ShardSnapshotStatus status() { + return status; + } + + @Override + public String toString() { + return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; + } + } + + /** + * A BWC version of {@link UpdateSnapshotStatusAction} + */ + class UpdateSnapshotStateRequestHandlerV6 implements TransportRequestHandler { + @Override + public void messageReceived(UpdateSnapshotStatusRequestV6 requestV6, final TransportChannel channel) throws Exception { + final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(requestV6.snapshot(), requestV6.shardId(), requestV6.status()); + innerUpdateSnapshotState(request, new ActionListener() { + @Override + public void onResponse(UpdateIndexShardSnapshotStatusResponse updateSnapshotStatusResponse) { + + } + + @Override + public void onFailure(Exception e) { + logger.warn("Failed to update snapshot status", e); + } + }); channel.sendResponse(TransportResponse.Empty.INSTANCE); } } diff --git a/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java b/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java new file mode 100644 index 00000000000..651cd96776e --- /dev/null +++ b/core/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java @@ -0,0 +1,115 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.disruption.NetworkDisruption; +import org.elasticsearch.test.transport.MockTransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.hasSize; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) +public class SnapshotShardsServiceIT extends AbstractSnapshotIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockRepository.Plugin.class, MockTransportService.TestPlugin.class); + } + + public void testRetryPostingSnapshotStatusMessages() throws Exception { + String masterNode = internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + + logger.info("--> creating repository"); + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("mock").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + final int shards = between(1, 10); + assertAcked(prepareCreate("test-index", 0, Settings.builder().put("number_of_shards", shards).put("number_of_replicas", 0))); + ensureGreen(); + final int numDocs = scaledRandomIntBetween(50, 100); + for (int i = 0; i < numDocs; i++) { + index("test-index", "doc", Integer.toString(i)); + } + + logger.info("--> blocking repository"); + String blockedNode = blockNodeWithIndex("test-repo", "test-index"); + dataNodeClient().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + .setWaitForCompletion(false) + .setIndices("test-index") + .get(); + waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); + + final SnapshotId snapshotId = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap") + .get().getSnapshots().get(0).snapshotId(); + + logger.info("--> start disrupting cluster"); + final NetworkDisruption networkDisruption = new NetworkDisruption(new NetworkDisruption.TwoPartitions(masterNode, dataNode), + NetworkDisruption.NetworkDelay.random(random())); + internalCluster().setDisruptionScheme(networkDisruption); + networkDisruption.startDisrupting(); + + logger.info("--> unblocking repository"); + unblockNode("test-repo", blockedNode); + + // Retrieve snapshot status from the data node. + SnapshotShardsService snapshotShardsService = internalCluster().getInstance(SnapshotShardsService.class, blockedNode); + assertBusy(() -> { + final Snapshot snapshot = new Snapshot("test-repo", snapshotId); + List stages = snapshotShardsService.currentSnapshotShards(snapshot) + .values().stream().map(IndexShardSnapshotStatus::stage).collect(Collectors.toList()); + assertThat(stages, hasSize(shards)); + assertThat(stages, everyItem(equalTo(IndexShardSnapshotStatus.Stage.DONE))); + }); + + logger.info("--> stop disrupting cluster"); + networkDisruption.stopDisrupting(); + internalCluster().clearDisruptionScheme(true); + + assertBusy(() -> { + GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster() + .prepareGetSnapshots("test-repo") + .setSnapshots("test-snap").get(); + SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + logger.info("Snapshot status [{}], successfulShards [{}]", snapshotInfo.state(), snapshotInfo.successfulShards()); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), equalTo(shards)); + }, 10, TimeUnit.SECONDS); + } +} diff --git a/qa/mixed-cluster/build.gradle b/qa/mixed-cluster/build.gradle index 59a6dfece52..66cad0c6eb6 100644 --- a/qa/mixed-cluster/build.gradle +++ b/qa/mixed-cluster/build.gradle @@ -57,6 +57,11 @@ for (Version version : wireCompatVersions) { if (project.bwc_tests_enabled) { bwcTest.dependsOn(versionBwcTest) } + + /* To support taking index snapshots, we have to set path.repo setting */ + tasks.getByName("${baseName}#mixedClusterTestRunner").configure { + systemProperty 'tests.path.repo', new File(buildDir, "cluster/shared/repo") + } } test.enabled = false // no unit tests for rolling upgrades, only the rest integration test diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java index f744b3029b1..9de8954c531 100644 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.yaml.ObjectPath; @@ -42,7 +43,9 @@ import java.util.stream.Collectors; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; public class IndexingIT extends ESRestTestCase { @@ -237,6 +240,57 @@ public class IndexingIT extends ESRestTestCase { } } + public void testUpdateSnapshotStatus() throws Exception { + Nodes nodes = buildNodeAndVersions(); + assertThat(nodes.getNewNodes(), not(empty())); + logger.info("cluster discovered: {}", nodes.toString()); + + // Create the repository before taking the snapshot. + String repoConfig = JsonXContent.contentBuilder() + .startObject() + .field("type", "fs") + .startObject("settings") + .field("compress", randomBoolean()) + .field("location", System.getProperty("tests.path.repo")) + .endObject() + .endObject() + .string(); + + assertOK( + client().performRequest("PUT", "/_snapshot/repo", emptyMap(), + new StringEntity(repoConfig, ContentType.APPLICATION_JSON)) + ); + + String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(",")); + + // Allocating shards on the BWC nodes to makes sure that taking snapshot happens on those nodes. + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(5, 10)) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put("index.routing.allocation.include._name", bwcNames); + + final String index = "test-snapshot-index"; + createIndex(index, settings.build()); + indexDocs(index, 0, between(50, 100)); + ensureGreen(); + assertOK(client().performRequest("POST", index + "/_refresh")); + + assertOK( + client().performRequest("PUT", "/_snapshot/repo/bwc-snapshot", singletonMap("wait_for_completion", "true"), + new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON)) + ); + + // Allocating shards on all nodes, taking snapshots should happen on all nodes. + updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); + ensureGreen(); + assertOK(client().performRequest("POST", index + "/_refresh")); + + assertOK( + client().performRequest("PUT", "/_snapshot/repo/mixed-snapshot", singletonMap("wait_for_completion", "true"), + new StringEntity("{\"indices\": \"" + index + "\"}", ContentType.APPLICATION_JSON)) + ); + } + private void assertCount(final String index, final String preference, final int expectedCount) throws IOException { final Response response = client().performRequest("GET", index + "/_count", Collections.singletonMap("preference", preference)); assertOK(response);