diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java index df1e5dc01ae..526db2a86a7 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java @@ -46,6 +46,8 @@ final class CcrRequestConverters { .addPathPartAsIs("_ccr", "follow") .build(); Request request = new Request(HttpPut.METHOD_NAME, endpoint); + RequestConverters.Params parameters = new RequestConverters.Params(request); + parameters.withWaitForActiveShards(putFollowRequest.waitForActiveShards()); request.setEntity(createEntity(putFollowRequest, REQUEST_BODY_CONTENT_TYPE)); return request; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java index 98e9d224564..8307b04bd70 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.client.ccr; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Validatable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -36,11 +37,17 @@ public final class PutFollowRequest extends FollowConfig implements Validatable, private final String remoteCluster; private final String leaderIndex; private final String followerIndex; + private final ActiveShardCount waitForActiveShards; public PutFollowRequest(String remoteCluster, String leaderIndex, String followerIndex) { + this(remoteCluster, leaderIndex, followerIndex, ActiveShardCount.NONE); + } + + public PutFollowRequest(String remoteCluster, String leaderIndex, String followerIndex, ActiveShardCount waitForActiveShards) { this.remoteCluster = Objects.requireNonNull(remoteCluster, "remoteCluster"); this.leaderIndex = Objects.requireNonNull(leaderIndex, "leaderIndex"); this.followerIndex = Objects.requireNonNull(followerIndex, "followerIndex"); + this.waitForActiveShards = waitForActiveShards; } @Override @@ -66,13 +73,18 @@ public final class PutFollowRequest extends FollowConfig implements Validatable, return followerIndex; } + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; PutFollowRequest that = (PutFollowRequest) o; - return Objects.equals(remoteCluster, that.remoteCluster) && + return Objects.equals(waitForActiveShards, that.waitForActiveShards) && + Objects.equals(remoteCluster, that.remoteCluster) && Objects.equals(leaderIndex, that.leaderIndex) && Objects.equals(followerIndex, that.followerIndex); } @@ -83,7 +95,7 @@ public final class PutFollowRequest extends FollowConfig implements Validatable, super.hashCode(), remoteCluster, leaderIndex, - followerIndex - ); + followerIndex, + waitForActiveShards); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java index 97a379aa16a..ee2685dee6d 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.ccr.CcrStatsRequest; import org.elasticsearch.client.ccr.CcrStatsResponse; @@ -95,7 +96,7 @@ public class CCRIT extends ESRestHighLevelClientTestCase { CreateIndexResponse response = highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT); assertThat(response.isAcknowledged(), is(true)); - PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower"); + PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower", ActiveShardCount.ONE); PutFollowResponse putFollowResponse = execute(putFollowRequest, ccrClient::putFollow, ccrClient::putFollowAsync); assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java index 9ef01a1a6f7..2e54d1c4a1a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; @@ -97,7 +98,8 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase { PutFollowRequest putFollowRequest = new PutFollowRequest( "local", // <1> "leader", // <2> - "follower" // <3> + "follower", // <3> + ActiveShardCount.ONE // <4> ); // end::ccr-put-follow-request @@ -175,7 +177,7 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase { String followIndex = "follower"; // Follow index, so that it can be paused: { - PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex); + PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE); PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); @@ -241,7 +243,7 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase { String followIndex = "follower"; // Follow index, so that it can be paused: { - PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex); + PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE); PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); @@ -317,7 +319,7 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase { String followIndex = "follower"; // Follow index, pause and close, so that it can be unfollowed: { - PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex); + PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE); PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); @@ -349,7 +351,7 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(followIndex); assertThat(client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT).isAcknowledged(), is(true)); - PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex); + PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE); PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); @@ -639,7 +641,7 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase { } { // Follow index, so that we can query for follow stats: - PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower"); + PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower", ActiveShardCount.ONE); PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); diff --git a/docs/java-rest/high-level/ccr/put_follow.asciidoc b/docs/java-rest/high-level/ccr/put_follow.asciidoc index 839a9bedc65..2f40bbd5d2b 100644 --- a/docs/java-rest/high-level/ccr/put_follow.asciidoc +++ b/docs/java-rest/high-level/ccr/put_follow.asciidoc @@ -20,6 +20,8 @@ include-tagged::{doc-tests-file}[{api}-request] <1> The name of the remote cluster alias. <2> The name of the leader in the remote cluster. <3> The name of the follower index that gets created as part of the put follow API call. +<4> The number of active shard copies to wait for before the put follow API returns a +response, as an `ActiveShardCount` [id="{upid}-{api}-response"] ==== Response diff --git a/docs/reference/ccr/apis/follow/get-follow-info.asciidoc b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc index 22418db1088..212b1167b6e 100644 --- a/docs/reference/ccr/apis/follow/get-follow-info.asciidoc +++ b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc @@ -22,7 +22,7 @@ replication options and whether the follower indices are active or paused. [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index" diff --git a/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc b/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc index 766f502ff93..8c02582e012 100644 --- a/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc +++ b/docs/reference/ccr/apis/follow/get-follow-stats.asciidoc @@ -21,7 +21,7 @@ following tasks associated with each shard for the specified indices. [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index" diff --git a/docs/reference/ccr/apis/follow/post-pause-follow.asciidoc b/docs/reference/ccr/apis/follow/post-pause-follow.asciidoc index 0d56ee76bd9..f5b0bef7b29 100644 --- a/docs/reference/ccr/apis/follow/post-pause-follow.asciidoc +++ b/docs/reference/ccr/apis/follow/post-pause-follow.asciidoc @@ -24,7 +24,7 @@ following task. [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index" diff --git a/docs/reference/ccr/apis/follow/post-resume-follow.asciidoc b/docs/reference/ccr/apis/follow/post-resume-follow.asciidoc index e8b4cd50f27..736061f2bfd 100644 --- a/docs/reference/ccr/apis/follow/post-resume-follow.asciidoc +++ b/docs/reference/ccr/apis/follow/post-resume-follow.asciidoc @@ -23,7 +23,7 @@ returns, the follower index will resume fetching operations from the leader inde [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index" diff --git a/docs/reference/ccr/apis/follow/post-unfollow.asciidoc b/docs/reference/ccr/apis/follow/post-unfollow.asciidoc index 6507c04ac50..c3126d02d1e 100644 --- a/docs/reference/ccr/apis/follow/post-unfollow.asciidoc +++ b/docs/reference/ccr/apis/follow/post-unfollow.asciidoc @@ -27,7 +27,7 @@ irreversible operation. [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index" diff --git a/docs/reference/ccr/apis/follow/put-follow.asciidoc b/docs/reference/ccr/apis/follow/put-follow.asciidoc index 3f6156c1e68..52253d6ad2f 100644 --- a/docs/reference/ccr/apis/follow/put-follow.asciidoc +++ b/docs/reference/ccr/apis/follow/put-follow.asciidoc @@ -31,7 +31,7 @@ POST /follower_index/_ccr/pause_follow [source,js] -------------------------------------------------- -PUT //_ccr/follow +PUT //_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "", "leader_index" : "" @@ -43,6 +43,11 @@ PUT //_ccr/follow // TEST[s//remote_cluster/] // TEST[s//leader_index/] +The `wait_for_active_shards` parameter specifies the number of shards to wait on being active +before responding. This defaults to waiting on none of the shards to be active. A shard must +be restored from the leader index being active. Restoring a follower shard requires transferring +all the remote Lucene segment files to the follower index. + ==== Path Parameters `follower_index` (required):: @@ -73,7 +78,7 @@ This example creates a follower index named `follower_index`: [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index", diff --git a/docs/reference/ccr/apis/get-ccr-stats.asciidoc b/docs/reference/ccr/apis/get-ccr-stats.asciidoc index f47e49ee826..8949de8787f 100644 --- a/docs/reference/ccr/apis/get-ccr-stats.asciidoc +++ b/docs/reference/ccr/apis/get-ccr-stats.asciidoc @@ -22,7 +22,7 @@ shard-level stats as in the <>. [source,js] -------------------------------------------------- -PUT /follower_index/_ccr/follow +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "remote_cluster", "leader_index" : "leader_index" diff --git a/docs/reference/ccr/getting-started.asciidoc b/docs/reference/ccr/getting-started.asciidoc index 1af236f7d86..7c59b862805 100644 --- a/docs/reference/ccr/getting-started.asciidoc +++ b/docs/reference/ccr/getting-started.asciidoc @@ -230,7 +230,7 @@ cluster. [source,js] -------------------------------------------------- -PUT /server-metrics-copy/_ccr/follow +PUT /server-metrics-copy/_ccr/follow?wait_for_active_shards=1 { "remote_cluster" : "leader", "leader_index" : "server-metrics" diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java new file mode 100644 index 00000000000..a74aad3ddb5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java @@ -0,0 +1,87 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.restore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.RestoreInfo; +import org.elasticsearch.snapshots.RestoreService; + +import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; + +public class RestoreClusterStateListener implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(RestoreClusterStateListener.class); + + private final ClusterService clusterService; + private final String uuid; + private final ActionListener listener; + + + private RestoreClusterStateListener(ClusterService clusterService, RestoreService.RestoreCompletionResponse response, + ActionListener listener) { + this.clusterService = clusterService; + this.uuid = response.getUuid(); + this.listener = listener; + } + + @Override + public void clusterChanged(ClusterChangedEvent changedEvent) { + final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid); + final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid); + if (prevEntry == null) { + // When there is a master failure after a restore has been started, this listener might not be registered + // on the current master and as such it might miss some intermediary cluster states due to batching. + // Clean up listener in that case and acknowledge completion of restore operation to client. + clusterService.removeListener(this); + listener.onResponse(new RestoreSnapshotResponse(null)); + } else if (newEntry == null) { + clusterService.removeListener(this); + ImmutableOpenMap shards = prevEntry.shards(); + assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state(); + assert RestoreService.completed(shards) : "expected all restore entries to be completed"; + RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(), + prevEntry.indices(), + shards.size(), + shards.size() - RestoreService.failedShards(shards)); + RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri); + logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId()); + listener.onResponse(response); + } else { + // restore not completed yet, wait for next cluster state update + } + } + + /** + * Creates a cluster state listener and registers it with the cluster service. The listener passed as a + * parameter will be called when the restore is complete. + */ + public static void createAndRegisterListener(ClusterService clusterService, RestoreService.RestoreCompletionResponse response, + ActionListener listener) { + clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener)); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index b362be49b10..d8dcc5eb8f8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -22,26 +22,17 @@ package org.elasticsearch.action.admin.cluster.snapshots.restore; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse; -import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; - /** * Transport action for restore snapshot operation */ @@ -86,39 +77,7 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction shards = prevEntry.shards(); - assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state(); - assert RestoreService.completed(shards) : "expected all restore entries to be completed"; - RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(), - prevEntry.indices(), - shards.size(), - shards.size() - RestoreService.failedShards(shards)); - RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri); - logger.debug("restore of [{}] completed", snapshot); - listener.onResponse(response); - } else { - // restore not completed yet, wait for next cluster state update - } - } - }; - - clusterService.addListener(clusterStateListener); + RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, listener); } else { listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); } diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml index f73f5c6dfb2..d5cd8ebd4f1 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml @@ -37,6 +37,7 @@ - do: ccr.follow: index: bar + wait_for_active_shards: 1 body: remote_cluster: local leader_index: foo diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_info.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_info.yml index f1e47d830cf..8383ecd4e68 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_info.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_info.yml @@ -33,6 +33,7 @@ - do: ccr.follow: index: bar + wait_for_active_shards: 1 body: remote_cluster: local leader_index: foo diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml index 5b3e6c18ef2..220463a60b2 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_stats.yml @@ -36,6 +36,7 @@ - do: ccr.follow: index: bar + wait_for_active_shards: 1 body: remote_cluster: local leader_index: foo diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/index_directly_into_follower_index.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/index_directly_into_follower_index.yml index 60c3b404b6f..62878437c37 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/index_directly_into_follower_index.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/index_directly_into_follower_index.yml @@ -37,6 +37,7 @@ - do: ccr.follow: index: bar + wait_for_active_shards: 1 body: remote_cluster: local leader_index: foo diff --git a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java index 25fbef7ada7..6cdb6b37961 100644 --- a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java +++ b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java @@ -73,7 +73,7 @@ public class ESCCRRestTestCase extends ESRestTestCase { } protected static void followIndex(RestClient client, String leaderCluster, String leaderIndex, String followIndex) throws IOException { - final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow"); + final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow?wait_for_active_shards=1"); request.setJsonEntity("{\"remote_cluster\": \"" + leaderCluster + "\", \"leader_index\": \"" + leaderIndex + "\", \"read_poll_timeout\": \"10ms\"}"); assertOK(client.performRequest(request)); @@ -186,6 +186,7 @@ public class ESCCRRestTestCase extends ESRestTestCase { protected static void ensureYellow(String index) throws IOException { Request request = new Request("GET", "/_cluster/health/" + index); request.addParameter("wait_for_status", "yellow"); + request.addParameter("wait_for_active_shards", "1"); request.addParameter("wait_for_no_relocating_shards", "true"); request.addParameter("wait_for_no_initializing_shards", "true"); request.addParameter("timeout", "70s"); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index bca8608904a..27f3b60fb52 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -6,52 +6,53 @@ package org.elasticsearch.xpack.ccr.action; -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import org.elasticsearch.ResourceAlreadyExistsException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.snapshots.RestoreInfo; +import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; +import org.elasticsearch.xpack.ccr.repository.CcrRepository; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import java.util.Objects; public final class TransportPutFollowAction - extends TransportMasterNodeAction { + extends TransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportPutFollowAction.class); private final Client client; - private final AllocationService allocationService; - private final ActiveShardsObserver activeShardsObserver; + private final RestoreService restoreService; private final CcrLicenseChecker ccrLicenseChecker; + private final ActiveShardsObserver activeShardsObserver; @Inject public TransportPutFollowAction( @@ -61,7 +62,7 @@ public final class TransportPutFollowAction final ActionFilters actionFilters, final IndexNameExpressionResolver indexNameExpressionResolver, final Client client, - final AllocationService allocationService, + final RestoreService restoreService, final CcrLicenseChecker ccrLicenseChecker) { super( PutFollowAction.NAME, @@ -72,9 +73,9 @@ public final class TransportPutFollowAction PutFollowAction.Request::new, indexNameExpressionResolver); this.client = client; - this.allocationService = allocationService; - this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); + this.restoreService = restoreService; this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker); + this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); } @Override @@ -96,7 +97,7 @@ public final class TransportPutFollowAction protected void masterOperation( final PutFollowAction.Request request, final ClusterState state, - final ActionListener listener) throws Exception { + final ActionListener listener) { if (ccrLicenseChecker.isCcrAllowed() == false) { listener.onFailure(LicenseUtils.newComplianceException("ccr")); return; @@ -111,12 +112,11 @@ public final class TransportPutFollowAction remoteCluster, leaderIndex, listener::onFailure, - (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener)); + (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, request, listener)); } private void createFollowerIndex( final IndexMetaData leaderIndexMetaData, - final String[] historyUUIDs, final PutFollowAction.Request request, final ActionListener listener) { if (leaderIndexMetaData == null) { @@ -131,98 +131,107 @@ public final class TransportPutFollowAction return; } - ActionListener handler = ActionListener.wrap( - result -> { - if (result) { - initiateFollowing(request, listener); - } else { - listener.onResponse(new PutFollowAction.Response(true, false, false)); - } - }, - listener::onFailure); - // Can't use create index api here, because then index templates can alter the mappings / settings. - // And index templates could introduce settings / mappings that are incompatible with the leader index. - clusterService.submitStateUpdateTask("create_following_index", new AckedClusterStateUpdateTask(request, handler) { + final Settings.Builder settingsBuilder = Settings.builder() + .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowRequest().getFollowerIndex()) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + request.getRemoteCluster(); + final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST) + .indices(request.getLeaderIndex()).indicesOptions(request.indicesOptions()).renamePattern("^(.*)$") + .renameReplacement(request.getFollowRequest().getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout()) + .indexSettings(settingsBuilder); + + final Client clientWithHeaders = CcrLicenseChecker.wrapClient(this.client, threadPool.getThreadContext().getHeaders()); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { @Override - protected Boolean newResponse(final boolean acknowledged) { - return acknowledged; + public void onFailure(Exception e) { + listener.onFailure(e); } @Override - public ClusterState execute(final ClusterState currentState) throws Exception { - String followIndex = request.getFollowRequest().getFollowerIndex(); - IndexMetaData currentIndex = currentState.metaData().index(followIndex); - if (currentIndex != null) { - throw new ResourceAlreadyExistsException(currentIndex.getIndex()); + protected void doRun() throws Exception { + restoreService.restoreSnapshot(restoreRequest, new ActionListener() { + + @Override + public void onResponse(RestoreService.RestoreCompletionResponse response) { + afterRestoreStarted(clientWithHeaders, request, listener, response); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } + }); + } + + private void afterRestoreStarted(Client clientWithHeaders, PutFollowAction.Request request, + ActionListener originalListener, + RestoreService.RestoreCompletionResponse response) { + final ActionListener listener; + if (ActiveShardCount.NONE.equals(request.waitForActiveShards())) { + originalListener.onResponse(new PutFollowAction.Response(true, false, false)); + listener = new ActionListener() { + + @Override + public void onResponse(PutFollowAction.Response response) { + logger.debug("put follow {} completed with {}", request, response); } - MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); - IndexMetaData.Builder imdBuilder = IndexMetaData.builder(followIndex); - - // Adding the leader index uuid for each shard as custom metadata: - Map metadata = new HashMap<>(); - metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs)); - metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID()); - metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY, leaderIndexMetaData.getIndex().getName()); - metadata.put(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY, request.getRemoteCluster()); - imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata); - - // Copy all settings, but overwrite a few settings. - Settings.Builder settingsBuilder = Settings.builder(); - settingsBuilder.put(leaderIndexMetaData.getSettings()); - // Overwriting UUID here, because otherwise we can't follow indices in the same cluster - settingsBuilder.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()); - settingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followIndex); - settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); - settingsBuilder.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); - imdBuilder.settings(settingsBuilder); - - // Copy mappings from leader IMD to follow IMD - for (ObjectObjectCursor cursor : leaderIndexMetaData.getMappings()) { - imdBuilder.putMapping(cursor.value); + @Override + public void onFailure(Exception e) { + logger.debug(() -> new ParameterizedMessage("put follow {} failed during the restore process", request), e); } - imdBuilder.setRoutingNumShards(leaderIndexMetaData.getRoutingNumShards()); - IndexMetaData followIMD = imdBuilder.build(); - mdBuilder.put(followIMD, false); + }; + } else { + listener = originalListener; + } - ClusterState.Builder builder = ClusterState.builder(currentState); - builder.metaData(mdBuilder.build()); - ClusterState updatedState = builder.build(); + RestoreClusterStateListener.createAndRegisterListener(clusterService, response, new ActionListener() { + @Override + public void onResponse(RestoreSnapshotResponse restoreSnapshotResponse) { + RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable()) - .addAsNew(updatedState.metaData().index(request.getFollowRequest().getFollowerIndex())); - updatedState = allocationService.reroute( - ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(), - "follow index [" + request.getFollowRequest().getFollowerIndex() + "] created"); + if (restoreInfo == null) { + // If restoreInfo is null then it is possible there was a master failure during the + // restore. + listener.onResponse(new PutFollowAction.Response(true, false, false)); + } else if (restoreInfo.failedShards() == 0) { + initiateFollowing(clientWithHeaders, request, listener); + } else { + assert restoreInfo.failedShards() > 0 : "Should have failed shards"; + listener.onResponse(new PutFollowAction.Response(true, false, false)); + } + } - logger.info("[{}] creating index, cause [ccr_create_and_follow], shards [{}]/[{}]", - followIndex, followIMD.getNumberOfShards(), followIMD.getNumberOfReplicas()); - - return updatedState; + @Override + public void onFailure(Exception e) { + listener.onFailure(e); } }); } private void initiateFollowing( - final PutFollowAction.Request request, - final ActionListener listener) { + final Client client, + final PutFollowAction.Request request, + final ActionListener listener) { + assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT."; activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()}, - ActiveShardCount.DEFAULT, request.timeout(), result -> { - if (result) { - client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap( - r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())), - listener::onFailure - )); - } else { - listener.onResponse(new PutFollowAction.Response(true, false, false)); - } - }, listener::onFailure); + request.waitForActiveShards(), request.timeout(), result -> { + if (result) { + client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap( + r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())), + listener::onFailure + )); + } else { + listener.onResponse(new PutFollowAction.Response(true, false, false)); + } + }, listener::onFailure); } @Override protected ClusterBlockException checkBlock(final PutFollowAction.Request request, final ClusterState state) { return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex()); } - } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 33a8c64c961..7ca95a14909 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.ccr.repository; +import com.carrotsearch.hppc.cursors.IntObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; @@ -81,6 +83,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit public static final String TYPE = "_ccr_"; public static final String NAME_PREFIX = "_ccr_"; private static final SnapshotId SNAPSHOT_ID = new SnapshotId(LATEST, LATEST); + private static final String IN_SYNC_ALLOCATION_ID = "ccr_restore"; private final RepositoryMetaData metadata; private final CcrSettings ccrSettings; @@ -157,7 +160,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse); String[] leaderHistoryUUIDs = future.actionGet(); - IndexMetaData.Builder imdBuilder = IndexMetaData.builder(leaderIndexMetaData); + IndexMetaData.Builder imdBuilder = IndexMetaData.builder(leaderIndex); // Adding the leader index uuid for each shard as custom metadata: Map metadata = new HashMap<>(); metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", leaderHistoryUUIDs)); @@ -166,6 +169,19 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit metadata.put(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY, remoteClusterAlias); imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata); + imdBuilder.settings(leaderIndexMetaData.getSettings()); + + // Copy mappings from leader IMD to follow IMD + for (ObjectObjectCursor cursor : leaderIndexMetaData.getMappings()) { + imdBuilder.putMapping(cursor.value); + } + + imdBuilder.setRoutingNumShards(leaderIndexMetaData.getRoutingNumShards()); + // We assert that insync allocation ids are not empty in `PrimaryShardAllocator` + for (IntObjectCursor> entry : leaderIndexMetaData.getInSyncAllocationIds()) { + imdBuilder.putInSyncAllocationIds(entry.key, Collections.singleton(IN_SYNC_ALLOCATION_ID)); + } + return imdBuilder.build(); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutFollowAction.java index 7b21422cb98..d7a2edd21d2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPutFollowAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.rest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; @@ -38,7 +39,8 @@ public class RestPutFollowAction extends BaseRestHandler { static Request createRequest(RestRequest restRequest) throws IOException { try (XContentParser parser = restRequest.contentOrSourceParamParser()) { - return Request.fromXContent(parser, restRequest.param("index")); + ActiveShardCount waitForActiveShards = ActiveShardCount.parseString(restRequest.param("wait_for_active_shards")); + return Request.fromXContent(parser, restRequest.param("index"), waitForActiveShards); } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 65fd80325e7..4a6c5411737 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; @@ -277,8 +278,13 @@ public abstract class CcrIntegTestCase extends ESTestCase { } protected final ClusterHealthStatus ensureFollowerGreen(String... indices) { + return ensureFollowerGreen(false, indices); + } + + protected final ClusterHealthStatus ensureFollowerGreen(boolean waitForNoInitializingShards, String... indices) { logger.info("ensure green follower indices {}", Arrays.toString(indices)); - return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30), false, indices); + return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30), + waitForNoInitializingShards, indices); } private ClusterHealthStatus ensureColor(TestCluster testCluster, @@ -411,10 +417,15 @@ public abstract class CcrIntegTestCase extends ESTestCase { } public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) { + return putFollow(leaderIndex, followerIndex, ActiveShardCount.ONE); + } + + public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex, ActiveShardCount waitForActiveShards) { PutFollowAction.Request request = new PutFollowAction.Request(); request.setRemoteCluster("leader_cluster"); request.setLeaderIndex(leaderIndex); request.setFollowRequest(resumeFollow(followerIndex)); + request.waitForActiveShards(waitForActiveShards); return request; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index ad8f545fa9d..48531c7d28f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -98,6 +99,7 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase { request.setRemoteCluster("local"); request.setLeaderIndex(leaderIndex); request.setFollowRequest(getResumeFollowRequest(followerIndex)); + request.waitForActiveShards(ActiveShardCount.ONE); return request; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 648c295efa8..dec671f6e63 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; @@ -30,7 +31,10 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterIndexHealth; +import org.elasticsearch.cluster.health.ClusterShardHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; @@ -91,14 +95,12 @@ public class IndexFollowingIT extends CcrIntegTestCase { public void testFollowIndex() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); - final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + int numberOfReplicas = between(0, 1); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, numberOfReplicas, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureLeaderYellow("index1"); - final PutFollowAction.Request followRequest = putFollow("index1", "index2"); - followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); - final int firstBatchNumDocs = randomIntBetween(2, 64); logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); for (int i = 0; i < firstBatchNumDocs; i++) { @@ -106,6 +108,30 @@ public class IndexFollowingIT extends CcrIntegTestCase { leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); } + boolean waitOnAll = randomBoolean(); + + final PutFollowAction.Request followRequest; + if (waitOnAll) { + followRequest = putFollow("index1", "index2", ActiveShardCount.ALL); + } else { + followRequest = putFollow("index1", "index2", ActiveShardCount.ONE); + } + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertTrue(response.isFollowIndexCreated()); + assertTrue(response.isFollowIndexShardsAcked()); + assertTrue(response.isIndexFollowingStarted()); + + ClusterHealthRequest healthRequest = Requests.clusterHealthRequest("index2").waitForNoRelocatingShards(true); + ClusterIndexHealth indexHealth = followerClient().admin().cluster().health(healthRequest).actionGet().getIndices().get("index2"); + for (ClusterShardHealth shardHealth : indexHealth.getShards().values()) { + if (waitOnAll) { + assertTrue(shardHealth.isPrimaryActive()); + assertEquals(1 + numberOfReplicas, shardHealth.getActiveShards()); + } else { + assertTrue(shardHealth.isPrimaryActive()); + } + } + final Map firstBatchNumDocsPerShard = new HashMap<>(); final ShardStats[] firstBatchShardStats = leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); @@ -152,6 +178,119 @@ public class IndexFollowingIT extends CcrIntegTestCase { assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards); } + public void testFollowIndexWithConcurrentMappingChanges() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + final int firstBatchNumDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); + for (int i = 0; i < firstBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + AtomicBoolean isRunning = new AtomicBoolean(true); + + // Concurrently index new docs with mapping changes + Thread thread = new Thread(() -> { + int docID = 10000; + char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray(); + for (char c : chars) { + if (isRunning.get() == false) { + break; + } + final String source; + long valueToPutInDoc = randomLongBetween(0, 50000); + if (randomBoolean()) { + source = String.format(Locale.ROOT, "{\"%c\":%d}", c, valueToPutInDoc); + } else { + source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, valueToPutInDoc); + } + for (int i = 1; i < 10; i++) { + if (isRunning.get() == false) { + break; + } + leaderClient().prepareIndex("index1", "doc", Long.toString(docID++)).setSource(source, XContentType.JSON).get(); + if (rarely()) { + leaderClient().admin().indices().prepareFlush("index1").setForce(true).get(); + } + } + leaderClient().admin().indices().prepareFlush("index1").setForce(true).setWaitIfOngoing(true).get(); + } + }); + thread.start(); + + final PutFollowAction.Request followRequest = putFollow("index1", "index2", ActiveShardCount.NONE); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen("index2"); + + for (int i = 0; i < firstBatchNumDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } + + final int secondBatchNumDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs); + for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } + + isRunning.set(false); + thread.join(); + } + + public void testFollowIndexWithoutWaitForComplete() throws Exception { + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderYellow("index1"); + + final int firstBatchNumDocs = randomIntBetween(2, 64); + logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); + for (int i = 0; i < firstBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + final PutFollowAction.Request followRequest = putFollow("index1", "index2", ActiveShardCount.NONE); + PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + assertTrue(response.isFollowIndexCreated()); + assertFalse(response.isFollowIndexShardsAcked()); + assertFalse(response.isIndexFollowingStarted()); + + // Check that the index exists, would throw index not found exception if the index is missing + followerClient().admin().indices().prepareGetIndex().addIndices("index2").get(); + ensureFollowerGreen(true, "index2"); + + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = + leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard)); + + for (int i = 0; i < firstBatchNumDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } + assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs); + pauseFollow("index2"); + } + public void testSyncMappings() throws Exception { final String leaderIndexSettings = getIndexSettings(2, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index 2c50411971a..f50f17c9e29 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -39,14 +39,14 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); ensureGreen("leader"); - final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower"); - client().execute(PutFollowAction.INSTANCE, followRequest).get(); - final long firstBatchNumDocs = randomIntBetween(2, 64); for (int i = 0; i < firstBatchNumDocs; i++) { client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get(); } + final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + assertBusy(() -> { assertThat(client().prepareSearch("follower").get().getHits().getTotalHits().value, equalTo(firstBatchNumDocs)); }); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java index 726a1c9893a..d32a773ebe2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutFollowActionRequestTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -30,7 +31,7 @@ public class PutFollowActionRequestTests extends AbstractSerializingTestCase { ObjectParser.ValueType.STRING); } - public static Request fromXContent(final XContentParser parser, final String followerIndex) throws IOException { + public static Request fromXContent(final XContentParser parser, final String followerIndex, ActiveShardCount waitForActiveShards) + throws IOException { Request request = PARSER.parse(parser, followerIndex); if (followerIndex != null) { if (request.getFollowRequest().getFollowerIndex() == null) { @@ -116,11 +119,13 @@ public final class PutFollowAction extends Action { } } } + request.waitForActiveShards(waitForActiveShards); return request; } private String remoteCluster; private String leaderIndex; + private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE; private ResumeFollowAction.Request followRequest; public Request() { @@ -142,6 +147,27 @@ public final class PutFollowAction extends Action { this.leaderIndex = leaderIndex; } + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } + + /** + * Sets the number of shard copies that should be active for follower index creation to + * return. Defaults to {@link ActiveShardCount#NONE}, which will not wait for any shards + * to be active. Set this value to {@link ActiveShardCount#DEFAULT} to wait for the primary + * shard to be active. Set this value to {@link ActiveShardCount#ALL} to wait for all shards + * (primary and all replicas) to be active before returning. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public void waitForActiveShards(ActiveShardCount waitForActiveShards) { + if (waitForActiveShards.equals(ActiveShardCount.DEFAULT)) { + this.waitForActiveShards = ActiveShardCount.NONE; + } else { + this.waitForActiveShards = waitForActiveShards; + } + } + public ResumeFollowAction.Request getFollowRequest() { return followRequest; } @@ -176,6 +202,10 @@ public final class PutFollowAction extends Action { super(in); remoteCluster = in.readString(); leaderIndex = in.readString(); + // TODO: Update after backport + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + waitForActiveShards(ActiveShardCount.readFrom(in)); + } followRequest = new ResumeFollowAction.Request(in); } @@ -184,6 +214,10 @@ public final class PutFollowAction extends Action { super.writeTo(out); out.writeString(remoteCluster); out.writeString(leaderIndex); + // TODO: Update after backport + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + waitForActiveShards.writeTo(out); + } followRequest.writeTo(out); } @@ -206,12 +240,23 @@ public final class PutFollowAction extends Action { Request request = (Request) o; return Objects.equals(remoteCluster, request.remoteCluster) && Objects.equals(leaderIndex, request.leaderIndex) && + Objects.equals(waitForActiveShards, request.waitForActiveShards) && Objects.equals(followRequest, request.followRequest); } @Override public int hashCode() { - return Objects.hash(remoteCluster, leaderIndex, followRequest); + return Objects.hash(remoteCluster, leaderIndex, waitForActiveShards, followRequest); + } + + @Override + public String toString() { + return "PutFollowAction.Request{" + + "remoteCluster='" + remoteCluster + '\'' + + ", leaderIndex='" + leaderIndex + '\'' + + ", waitForActiveShards=" + waitForActiveShards + + ", followRequest=" + followRequest + + '}'; } } @@ -280,6 +325,15 @@ public final class PutFollowAction extends Action { public int hashCode() { return Objects.hash(followIndexCreated, followIndexShardsAcked, indexFollowingStarted); } + + @Override + public String toString() { + return "PutFollowAction.Response{" + + "followIndexCreated=" + followIndexCreated + + ", followIndexShardsAcked=" + followIndexShardsAcked + + ", indexFollowingStarted=" + indexFollowingStarted + + '}'; + } } } diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java index 65baeb5f168..f8ffce9cd81 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java @@ -61,6 +61,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase { // Policy with the same name must exist in follower cluster too: putILMPolicy(policyName, "50GB", null, TimeValue.timeValueHours(7*24)); followIndex(indexName, indexName); + ensureGreen(indexName); // Aliases are not copied from leader index, so we need to add that for the rollover action in follower cluster: client().performRequest(new Request("PUT", "/" + indexName + "/_alias/logs")); @@ -116,6 +117,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase { } else if ("follow".equals(targetCluster)) { createNewSingletonPolicy("unfollow-only", "hot", new UnfollowAction(), TimeValue.ZERO); followIndex(indexName, indexName); + ensureGreen(indexName); // Create the repository before taking the snapshot. Request request = new Request("PUT", "/_snapshot/repo"); @@ -210,7 +212,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase { "\"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}}, " + "\"aliases\": {\"" + alias + "\": {\"is_write_index\": true}} }"); assertOK(leaderClient.performRequest(createIndexRequest)); - // Check that the new index is creeg + // Check that the new index is created Request checkIndexRequest = new Request("GET", "/_cluster/health/" + indexName); checkIndexRequest.addParameter("wait_for_status", "green"); checkIndexRequest.addParameter("timeout", "70s"); @@ -226,6 +228,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase { index(leaderClient, indexName, "1"); assertDocumentExists(leaderClient, indexName, "1"); + ensureGreen(indexName); assertBusy(() -> { assertDocumentExists(client(), indexName, "1"); // Sanity check that following_index setting has been set, so that we can verify later that this setting has been unset: diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.follow.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.follow.json index 635a4e62683..588dd602612 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.follow.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.follow.json @@ -11,6 +11,13 @@ "required": true, "description": "The name of the follower index" } + }, + "params": { + "wait_for_active_shards": { + "type" : "string", + "description" : "Sets the number of shard copies that must be active before returning. Defaults to 0. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)", + "default": "0" + } } }, "body": {