Use `CcrRepository` to init follower index (#35719)

This commit modifies the put follow index action to use a
CcrRepository when creating a follower index. It routes 
the logic through the snapshot/restore process. A 
wait_for_active_shards parameter can be used to configure
how long to wait before returning the response.
This commit is contained in:
Tim Brooks 2019-01-29 11:47:29 -07:00 committed by GitHub
parent d05a4b9d14
commit 00ace369af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 488 additions and 169 deletions

View File

@ -46,6 +46,8 @@ final class CcrRequestConverters {
.addPathPartAsIs("_ccr", "follow")
.build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint);
RequestConverters.Params parameters = new RequestConverters.Params(request);
parameters.withWaitForActiveShards(putFollowRequest.waitForActiveShards());
request.setEntity(createEntity(putFollowRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.client.ccr;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContentObject;
@ -36,11 +37,17 @@ public final class PutFollowRequest extends FollowConfig implements Validatable,
private final String remoteCluster;
private final String leaderIndex;
private final String followerIndex;
private final ActiveShardCount waitForActiveShards;
public PutFollowRequest(String remoteCluster, String leaderIndex, String followerIndex) {
this(remoteCluster, leaderIndex, followerIndex, ActiveShardCount.NONE);
}
public PutFollowRequest(String remoteCluster, String leaderIndex, String followerIndex, ActiveShardCount waitForActiveShards) {
this.remoteCluster = Objects.requireNonNull(remoteCluster, "remoteCluster");
this.leaderIndex = Objects.requireNonNull(leaderIndex, "leaderIndex");
this.followerIndex = Objects.requireNonNull(followerIndex, "followerIndex");
this.waitForActiveShards = waitForActiveShards;
}
@Override
@ -66,13 +73,18 @@ public final class PutFollowRequest extends FollowConfig implements Validatable,
return followerIndex;
}
public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
PutFollowRequest that = (PutFollowRequest) o;
return Objects.equals(remoteCluster, that.remoteCluster) &&
return Objects.equals(waitForActiveShards, that.waitForActiveShards) &&
Objects.equals(remoteCluster, that.remoteCluster) &&
Objects.equals(leaderIndex, that.leaderIndex) &&
Objects.equals(followerIndex, that.followerIndex);
}
@ -83,7 +95,7 @@ public final class PutFollowRequest extends FollowConfig implements Validatable,
super.hashCode(),
remoteCluster,
leaderIndex,
followerIndex
);
followerIndex,
waitForActiveShards);
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.ccr.CcrStatsRequest;
import org.elasticsearch.client.ccr.CcrStatsResponse;
@ -95,7 +96,7 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
CreateIndexResponse response = highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
assertThat(response.isAcknowledged(), is(true));
PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower");
PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower", ActiveShardCount.ONE);
PutFollowResponse putFollowResponse = execute(putFollowRequest, ccrClient::putFollow, ccrClient::putFollowAsync);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequ
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
@ -97,7 +98,8 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
PutFollowRequest putFollowRequest = new PutFollowRequest(
"local", // <1>
"leader", // <2>
"follower" // <3>
"follower", // <3>
ActiveShardCount.ONE // <4>
);
// end::ccr-put-follow-request
@ -175,7 +177,7 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
String followIndex = "follower";
// Follow index, so that it can be paused:
{
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex);
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE);
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
@ -241,7 +243,7 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
String followIndex = "follower";
// Follow index, so that it can be paused:
{
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex);
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE);
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
@ -317,7 +319,7 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
String followIndex = "follower";
// Follow index, pause and close, so that it can be unfollowed:
{
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex);
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE);
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
@ -349,7 +351,7 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(followIndex);
assertThat(client.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT).isAcknowledged(), is(true));
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex);
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex, ActiveShardCount.ONE);
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
@ -639,7 +641,7 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
}
{
// Follow index, so that we can query for follow stats:
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower");
PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", "follower", ActiveShardCount.ONE);
PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));

View File

@ -20,6 +20,8 @@ include-tagged::{doc-tests-file}[{api}-request]
<1> The name of the remote cluster alias.
<2> The name of the leader in the remote cluster.
<3> The name of the follower index that gets created as part of the put follow API call.
<4> The number of active shard copies to wait for before the put follow API returns a
response, as an `ActiveShardCount`
[id="{upid}-{api}-response"]
==== Response

View File

@ -22,7 +22,7 @@ replication options and whether the follower indices are active or paused.
[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index"

View File

@ -21,7 +21,7 @@ following tasks associated with each shard for the specified indices.
[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index"

View File

@ -24,7 +24,7 @@ following task.
[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index"

View File

@ -23,7 +23,7 @@ returns, the follower index will resume fetching operations from the leader inde
[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index"

View File

@ -27,7 +27,7 @@ irreversible operation.
[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index"

View File

@ -31,7 +31,7 @@ POST /follower_index/_ccr/pause_follow
[source,js]
--------------------------------------------------
PUT /<follower_index>/_ccr/follow
PUT /<follower_index>/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "<remote_cluster>",
"leader_index" : "<leader_index>"
@ -43,6 +43,11 @@ PUT /<follower_index>/_ccr/follow
// TEST[s/<remote_cluster>/remote_cluster/]
// TEST[s/<leader_index>/leader_index/]
The `wait_for_active_shards` parameter specifies the number of shards to wait on being active
before responding. This defaults to waiting on none of the shards to be active. A shard must
be restored from the leader index being active. Restoring a follower shard requires transferring
all the remote Lucene segment files to the follower index.
==== Path Parameters
`follower_index` (required)::
@ -73,7 +78,7 @@ This example creates a follower index named `follower_index`:
[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index",

View File

@ -22,7 +22,7 @@ shard-level stats as in the <<ccr-get-follow-stats,get follower stats API>>.
[source,js]
--------------------------------------------------
PUT /follower_index/_ccr/follow
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index"

View File

@ -230,7 +230,7 @@ cluster.
[source,js]
--------------------------------------------------
PUT /server-metrics-copy/_ccr/follow
PUT /server-metrics-copy/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "leader",
"leader_index" : "server-metrics"

View File

@ -0,0 +1,87 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.snapshots.restore;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
public class RestoreClusterStateListener implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(RestoreClusterStateListener.class);
private final ClusterService clusterService;
private final String uuid;
private final ActionListener<RestoreSnapshotResponse> listener;
private RestoreClusterStateListener(ClusterService clusterService, RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener) {
this.clusterService = clusterService;
this.uuid = response.getUuid();
this.listener = listener;
}
@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreSnapshotResponse(null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards));
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId());
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
}
}
/**
* Creates a cluster state listener and registers it with the cluster service. The listener passed as a
* parameter will be called when the restore is complete.
*/
public static void createAndRegisterListener(ClusterService clusterService, RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener) {
clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener));
}
}

View File

@ -22,26 +22,17 @@ package org.elasticsearch.action.admin.cluster.snapshots.restore;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
/**
* Transport action for restore snapshot operation
*/
@ -86,39 +77,7 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
@Override
public void onResponse(RestoreCompletionResponse restoreCompletionResponse) {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
final Snapshot snapshot = restoreCompletionResponse.getSnapshot();
String uuid = restoreCompletionResponse.getUuid();
ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreSnapshotResponse(null));
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards));
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
logger.debug("restore of [{}] completed", snapshot);
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
}
}
};
clusterService.addListener(clusterStateListener);
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, listener);
} else {
listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
}

View File

@ -37,6 +37,7 @@
- do:
ccr.follow:
index: bar
wait_for_active_shards: 1
body:
remote_cluster: local
leader_index: foo

View File

@ -33,6 +33,7 @@
- do:
ccr.follow:
index: bar
wait_for_active_shards: 1
body:
remote_cluster: local
leader_index: foo

View File

@ -36,6 +36,7 @@
- do:
ccr.follow:
index: bar
wait_for_active_shards: 1
body:
remote_cluster: local
leader_index: foo

View File

@ -37,6 +37,7 @@
- do:
ccr.follow:
index: bar
wait_for_active_shards: 1
body:
remote_cluster: local
leader_index: foo

View File

@ -73,7 +73,7 @@ public class ESCCRRestTestCase extends ESRestTestCase {
}
protected static void followIndex(RestClient client, String leaderCluster, String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow?wait_for_active_shards=1");
request.setJsonEntity("{\"remote_cluster\": \"" + leaderCluster + "\", \"leader_index\": \"" + leaderIndex +
"\", \"read_poll_timeout\": \"10ms\"}");
assertOK(client.performRequest(request));
@ -186,6 +186,7 @@ public class ESCCRRestTestCase extends ESRestTestCase {
protected static void ensureYellow(String index) throws IOException {
Request request = new Request("GET", "/_cluster/health/" + index);
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_active_shards", "1");
request.addParameter("wait_for_no_relocating_shards", "true");
request.addParameter("wait_for_no_initializing_shards", "true");
request.addParameter("timeout", "70s");

View File

@ -6,52 +6,53 @@
package org.elasticsearch.xpack.ccr.action;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public final class TransportPutFollowAction
extends TransportMasterNodeAction<PutFollowAction.Request, PutFollowAction.Response> {
extends TransportMasterNodeAction<PutFollowAction.Request, PutFollowAction.Response> {
private static final Logger logger = LogManager.getLogger(TransportPutFollowAction.class);
private final Client client;
private final AllocationService allocationService;
private final ActiveShardsObserver activeShardsObserver;
private final RestoreService restoreService;
private final CcrLicenseChecker ccrLicenseChecker;
private final ActiveShardsObserver activeShardsObserver;
@Inject
public TransportPutFollowAction(
@ -61,7 +62,7 @@ public final class TransportPutFollowAction
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Client client,
final AllocationService allocationService,
final RestoreService restoreService,
final CcrLicenseChecker ccrLicenseChecker) {
super(
PutFollowAction.NAME,
@ -72,9 +73,9 @@ public final class TransportPutFollowAction
PutFollowAction.Request::new,
indexNameExpressionResolver);
this.client = client;
this.allocationService = allocationService;
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
this.restoreService = restoreService;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
}
@Override
@ -96,7 +97,7 @@ public final class TransportPutFollowAction
protected void masterOperation(
final PutFollowAction.Request request,
final ClusterState state,
final ActionListener<PutFollowAction.Response> listener) throws Exception {
final ActionListener<PutFollowAction.Response> listener) {
if (ccrLicenseChecker.isCcrAllowed() == false) {
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return;
@ -111,12 +112,11 @@ public final class TransportPutFollowAction
remoteCluster,
leaderIndex,
listener::onFailure,
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, request, listener));
}
private void createFollowerIndex(
final IndexMetaData leaderIndexMetaData,
final String[] historyUUIDs,
final PutFollowAction.Request request,
final ActionListener<PutFollowAction.Response> listener) {
if (leaderIndexMetaData == null) {
@ -131,98 +131,107 @@ public final class TransportPutFollowAction
return;
}
ActionListener<Boolean> handler = ActionListener.wrap(
result -> {
if (result) {
initiateFollowing(request, listener);
} else {
listener.onResponse(new PutFollowAction.Response(true, false, false));
}
},
listener::onFailure);
// Can't use create index api here, because then index templates can alter the mappings / settings.
// And index templates could introduce settings / mappings that are incompatible with the leader index.
clusterService.submitStateUpdateTask("create_following_index", new AckedClusterStateUpdateTask<Boolean>(request, handler) {
final Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowRequest().getFollowerIndex())
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + request.getRemoteCluster();
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
.indices(request.getLeaderIndex()).indicesOptions(request.indicesOptions()).renamePattern("^(.*)$")
.renameReplacement(request.getFollowRequest().getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout())
.indexSettings(settingsBuilder);
final Client clientWithHeaders = CcrLicenseChecker.wrapClient(this.client, threadPool.getThreadContext().getHeaders());
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
@Override
protected Boolean newResponse(final boolean acknowledged) {
return acknowledged;
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
String followIndex = request.getFollowRequest().getFollowerIndex();
IndexMetaData currentIndex = currentState.metaData().index(followIndex);
if (currentIndex != null) {
throw new ResourceAlreadyExistsException(currentIndex.getIndex());
protected void doRun() throws Exception {
restoreService.restoreSnapshot(restoreRequest, new ActionListener<RestoreService.RestoreCompletionResponse>() {
@Override
public void onResponse(RestoreService.RestoreCompletionResponse response) {
afterRestoreStarted(clientWithHeaders, request, listener, response);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
});
}
private void afterRestoreStarted(Client clientWithHeaders, PutFollowAction.Request request,
ActionListener<PutFollowAction.Response> originalListener,
RestoreService.RestoreCompletionResponse response) {
final ActionListener<PutFollowAction.Response> listener;
if (ActiveShardCount.NONE.equals(request.waitForActiveShards())) {
originalListener.onResponse(new PutFollowAction.Response(true, false, false));
listener = new ActionListener<PutFollowAction.Response>() {
@Override
public void onResponse(PutFollowAction.Response response) {
logger.debug("put follow {} completed with {}", request, response);
}
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(followIndex);
// Adding the leader index uuid for each shard as custom metadata:
Map<String, String> metadata = new HashMap<>();
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs));
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID());
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY, leaderIndexMetaData.getIndex().getName());
metadata.put(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY, request.getRemoteCluster());
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);
// Copy all settings, but overwrite a few settings.
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(leaderIndexMetaData.getSettings());
// Overwriting UUID here, because otherwise we can't follow indices in the same cluster
settingsBuilder.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
settingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followIndex);
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
settingsBuilder.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
imdBuilder.settings(settingsBuilder);
// Copy mappings from leader IMD to follow IMD
for (ObjectObjectCursor<String, MappingMetaData> cursor : leaderIndexMetaData.getMappings()) {
imdBuilder.putMapping(cursor.value);
@Override
public void onFailure(Exception e) {
logger.debug(() -> new ParameterizedMessage("put follow {} failed during the restore process", request), e);
}
imdBuilder.setRoutingNumShards(leaderIndexMetaData.getRoutingNumShards());
IndexMetaData followIMD = imdBuilder.build();
mdBuilder.put(followIMD, false);
};
} else {
listener = originalListener;
}
ClusterState.Builder builder = ClusterState.builder(currentState);
builder.metaData(mdBuilder.build());
ClusterState updatedState = builder.build();
RestoreClusterStateListener.createAndRegisterListener(clusterService, response, new ActionListener<RestoreSnapshotResponse>() {
@Override
public void onResponse(RestoreSnapshotResponse restoreSnapshotResponse) {
RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metaData().index(request.getFollowRequest().getFollowerIndex()));
updatedState = allocationService.reroute(
ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
"follow index [" + request.getFollowRequest().getFollowerIndex() + "] created");
if (restoreInfo == null) {
// If restoreInfo is null then it is possible there was a master failure during the
// restore.
listener.onResponse(new PutFollowAction.Response(true, false, false));
} else if (restoreInfo.failedShards() == 0) {
initiateFollowing(clientWithHeaders, request, listener);
} else {
assert restoreInfo.failedShards() > 0 : "Should have failed shards";
listener.onResponse(new PutFollowAction.Response(true, false, false));
}
}
logger.info("[{}] creating index, cause [ccr_create_and_follow], shards [{}]/[{}]",
followIndex, followIMD.getNumberOfShards(), followIMD.getNumberOfReplicas());
return updatedState;
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
private void initiateFollowing(
final PutFollowAction.Request request,
final ActionListener<PutFollowAction.Response> listener) {
final Client client,
final PutFollowAction.Request request,
final ActionListener<PutFollowAction.Response> listener) {
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "PutFollowAction does not support DEFAULT.";
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()},
ActiveShardCount.DEFAULT, request.timeout(), result -> {
if (result) {
client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap(
r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())),
listener::onFailure
));
} else {
listener.onResponse(new PutFollowAction.Response(true, false, false));
}
}, listener::onFailure);
request.waitForActiveShards(), request.timeout(), result -> {
if (result) {
client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap(
r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())),
listener::onFailure
));
} else {
listener.onResponse(new PutFollowAction.Response(true, false, false));
}
}, listener::onFailure);
}
@Override
protected ClusterBlockException checkBlock(final PutFollowAction.Request request, final ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex());
}
}

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.ccr.repository;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
@ -81,6 +83,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
public static final String TYPE = "_ccr_";
public static final String NAME_PREFIX = "_ccr_";
private static final SnapshotId SNAPSHOT_ID = new SnapshotId(LATEST, LATEST);
private static final String IN_SYNC_ALLOCATION_ID = "ccr_restore";
private final RepositoryMetaData metadata;
private final CcrSettings ccrSettings;
@ -157,7 +160,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse);
String[] leaderHistoryUUIDs = future.actionGet();
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(leaderIndexMetaData);
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(leaderIndex);
// Adding the leader index uuid for each shard as custom metadata:
Map<String, String> metadata = new HashMap<>();
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", leaderHistoryUUIDs));
@ -166,6 +169,19 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
metadata.put(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY, remoteClusterAlias);
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);
imdBuilder.settings(leaderIndexMetaData.getSettings());
// Copy mappings from leader IMD to follow IMD
for (ObjectObjectCursor<String, MappingMetaData> cursor : leaderIndexMetaData.getMappings()) {
imdBuilder.putMapping(cursor.value);
}
imdBuilder.setRoutingNumShards(leaderIndexMetaData.getRoutingNumShards());
// We assert that insync allocation ids are not empty in `PrimaryShardAllocator`
for (IntObjectCursor<Set<String>> entry : leaderIndexMetaData.getInSyncAllocationIds()) {
imdBuilder.putInSyncAllocationIds(entry.key, Collections.singleton(IN_SYNC_ALLOCATION_ID));
}
return imdBuilder.build();
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr.rest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
@ -38,7 +39,8 @@ public class RestPutFollowAction extends BaseRestHandler {
static Request createRequest(RestRequest restRequest) throws IOException {
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
return Request.fromXContent(parser, restRequest.param("index"));
ActiveShardCount waitForActiveShards = ActiveShardCount.parseString(restRequest.param("wait_for_active_shards"));
return Request.fromXContent(parser, restRequest.param("index"), waitForActiveShards);
}
}
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
@ -277,8 +278,13 @@ public abstract class CcrIntegTestCase extends ESTestCase {
}
protected final ClusterHealthStatus ensureFollowerGreen(String... indices) {
return ensureFollowerGreen(false, indices);
}
protected final ClusterHealthStatus ensureFollowerGreen(boolean waitForNoInitializingShards, String... indices) {
logger.info("ensure green follower indices {}", Arrays.toString(indices));
return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30), false, indices);
return ensureColor(clusterGroup.followerCluster, ClusterHealthStatus.GREEN, TimeValue.timeValueSeconds(30),
waitForNoInitializingShards, indices);
}
private ClusterHealthStatus ensureColor(TestCluster testCluster,
@ -411,10 +417,15 @@ public abstract class CcrIntegTestCase extends ESTestCase {
}
public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex) {
return putFollow(leaderIndex, followerIndex, ActiveShardCount.ONE);
}
public static PutFollowAction.Request putFollow(String leaderIndex, String followerIndex, ActiveShardCount waitForActiveShards) {
PutFollowAction.Request request = new PutFollowAction.Request();
request.setRemoteCluster("leader_cluster");
request.setLeaderIndex(leaderIndex);
request.setFollowRequest(resumeFollow(followerIndex));
request.waitForActiveShards(waitForActiveShards);
return request;
}

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -98,6 +99,7 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
request.setRemoteCluster("local");
request.setLeaderIndex(leaderIndex);
request.setFollowRequest(getResumeFollowRequest(followerIndex));
request.waitForActiveShards(ActiveShardCount.ONE);
return request;
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ccr;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
@ -30,7 +31,10 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
@ -91,14 +95,12 @@ public class IndexFollowingIT extends CcrIntegTestCase {
public void testFollowIndex() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
int numberOfReplicas = between(0, 1);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, numberOfReplicas,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");
final PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
final int firstBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
for (int i = 0; i < firstBatchNumDocs; i++) {
@ -106,6 +108,30 @@ public class IndexFollowingIT extends CcrIntegTestCase {
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
boolean waitOnAll = randomBoolean();
final PutFollowAction.Request followRequest;
if (waitOnAll) {
followRequest = putFollow("index1", "index2", ActiveShardCount.ALL);
} else {
followRequest = putFollow("index1", "index2", ActiveShardCount.ONE);
}
PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
assertTrue(response.isFollowIndexCreated());
assertTrue(response.isFollowIndexShardsAcked());
assertTrue(response.isIndexFollowingStarted());
ClusterHealthRequest healthRequest = Requests.clusterHealthRequest("index2").waitForNoRelocatingShards(true);
ClusterIndexHealth indexHealth = followerClient().admin().cluster().health(healthRequest).actionGet().getIndices().get("index2");
for (ClusterShardHealth shardHealth : indexHealth.getShards().values()) {
if (waitOnAll) {
assertTrue(shardHealth.isPrimaryActive());
assertEquals(1 + numberOfReplicas, shardHealth.getActiveShards());
} else {
assertTrue(shardHealth.isPrimaryActive());
}
}
final Map<ShardId, Long> firstBatchNumDocsPerShard = new HashMap<>();
final ShardStats[] firstBatchShardStats =
leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
@ -152,6 +178,119 @@ public class IndexFollowingIT extends CcrIntegTestCase {
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards);
}
public void testFollowIndexWithConcurrentMappingChanges() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");
final int firstBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
for (int i = 0; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
AtomicBoolean isRunning = new AtomicBoolean(true);
// Concurrently index new docs with mapping changes
Thread thread = new Thread(() -> {
int docID = 10000;
char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray();
for (char c : chars) {
if (isRunning.get() == false) {
break;
}
final String source;
long valueToPutInDoc = randomLongBetween(0, 50000);
if (randomBoolean()) {
source = String.format(Locale.ROOT, "{\"%c\":%d}", c, valueToPutInDoc);
} else {
source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, valueToPutInDoc);
}
for (int i = 1; i < 10; i++) {
if (isRunning.get() == false) {
break;
}
leaderClient().prepareIndex("index1", "doc", Long.toString(docID++)).setSource(source, XContentType.JSON).get();
if (rarely()) {
leaderClient().admin().indices().prepareFlush("index1").setForce(true).get();
}
}
leaderClient().admin().indices().prepareFlush("index1").setForce(true).setWaitIfOngoing(true).get();
}
});
thread.start();
final PutFollowAction.Request followRequest = putFollow("index1", "index2", ActiveShardCount.NONE);
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
ensureFollowerGreen("index2");
for (int i = 0; i < firstBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
final int secondBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
isRunning.set(false);
thread.join();
}
public void testFollowIndexWithoutWaitForComplete() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderYellow("index1");
final int firstBatchNumDocs = randomIntBetween(2, 64);
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
for (int i = 0; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}
final PutFollowAction.Request followRequest = putFollow("index1", "index2", ActiveShardCount.NONE);
PutFollowAction.Response response = followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
assertTrue(response.isFollowIndexCreated());
assertFalse(response.isFollowIndexShardsAcked());
assertFalse(response.isIndexFollowingStarted());
// Check that the index exists, would throw index not found exception if the index is missing
followerClient().admin().indices().prepareGetIndex().addIndices("index2").get();
ensureFollowerGreen(true, "index2");
final Map<ShardId, Long> firstBatchNumDocsPerShard = new HashMap<>();
final ShardStats[] firstBatchShardStats =
leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
for (final ShardStats shardStats : firstBatchShardStats) {
if (shardStats.getShardRouting().primary()) {
long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1;
firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value);
}
}
assertBusy(assertTask(numberOfPrimaryShards, firstBatchNumDocsPerShard));
for (int i = 0; i < firstBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
assertTotalNumberOfOptimizedIndexing(resolveFollowerIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs);
pauseFollow("index2");
}
public void testSyncMappings() throws Exception {
final String leaderIndexSettings = getIndexSettings(2, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));

View File

@ -39,14 +39,14 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("leader");
final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
final long firstBatchNumDocs = randomIntBetween(2, 64);
for (int i = 0; i < firstBatchNumDocs; i++) {
client().prepareIndex("leader", "doc").setSource("{}", XContentType.JSON).get();
}
final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower");
client().execute(PutFollowAction.INSTANCE, followRequest).get();
assertBusy(() -> {
assertThat(client().prepareSearch("follower").get().getHits().getTotalHits().value, equalTo(firstBatchNumDocs));
});

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
@ -30,7 +31,7 @@ public class PutFollowActionRequestTests extends AbstractSerializingTestCase<Put
@Override
protected PutFollowAction.Request doParseInstance(XContentParser parser) throws IOException {
return PutFollowAction.Request.fromXContent(parser, null);
return PutFollowAction.Request.fromXContent(parser, null, ActiveShardCount.DEFAULT);
}
@Override

View File

@ -6,10 +6,12 @@
package org.elasticsearch.xpack.core.ccr.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField;
@ -28,10 +30,10 @@ import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.FOLLOWER_INDEX_FIELD;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_OPERATION_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_READ_REQUESTS;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_WRITE_REQUESTS;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_OPERATION_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_SIZE;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_RETRY_DELAY_FIELD;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_COUNT;
import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_SIZE;
@ -105,7 +107,8 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
ObjectParser.ValueType.STRING);
}
public static Request fromXContent(final XContentParser parser, final String followerIndex) throws IOException {
public static Request fromXContent(final XContentParser parser, final String followerIndex, ActiveShardCount waitForActiveShards)
throws IOException {
Request request = PARSER.parse(parser, followerIndex);
if (followerIndex != null) {
if (request.getFollowRequest().getFollowerIndex() == null) {
@ -116,11 +119,13 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
}
}
}
request.waitForActiveShards(waitForActiveShards);
return request;
}
private String remoteCluster;
private String leaderIndex;
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
private ResumeFollowAction.Request followRequest;
public Request() {
@ -142,6 +147,27 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
this.leaderIndex = leaderIndex;
}
public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}
/**
* Sets the number of shard copies that should be active for follower index creation to
* return. Defaults to {@link ActiveShardCount#NONE}, which will not wait for any shards
* to be active. Set this value to {@link ActiveShardCount#DEFAULT} to wait for the primary
* shard to be active. Set this value to {@link ActiveShardCount#ALL} to wait for all shards
* (primary and all replicas) to be active before returning.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public void waitForActiveShards(ActiveShardCount waitForActiveShards) {
if (waitForActiveShards.equals(ActiveShardCount.DEFAULT)) {
this.waitForActiveShards = ActiveShardCount.NONE;
} else {
this.waitForActiveShards = waitForActiveShards;
}
}
public ResumeFollowAction.Request getFollowRequest() {
return followRequest;
}
@ -176,6 +202,10 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
super(in);
remoteCluster = in.readString();
leaderIndex = in.readString();
// TODO: Update after backport
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
waitForActiveShards(ActiveShardCount.readFrom(in));
}
followRequest = new ResumeFollowAction.Request(in);
}
@ -184,6 +214,10 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
super.writeTo(out);
out.writeString(remoteCluster);
out.writeString(leaderIndex);
// TODO: Update after backport
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
waitForActiveShards.writeTo(out);
}
followRequest.writeTo(out);
}
@ -206,12 +240,23 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
Request request = (Request) o;
return Objects.equals(remoteCluster, request.remoteCluster) &&
Objects.equals(leaderIndex, request.leaderIndex) &&
Objects.equals(waitForActiveShards, request.waitForActiveShards) &&
Objects.equals(followRequest, request.followRequest);
}
@Override
public int hashCode() {
return Objects.hash(remoteCluster, leaderIndex, followRequest);
return Objects.hash(remoteCluster, leaderIndex, waitForActiveShards, followRequest);
}
@Override
public String toString() {
return "PutFollowAction.Request{" +
"remoteCluster='" + remoteCluster + '\'' +
", leaderIndex='" + leaderIndex + '\'' +
", waitForActiveShards=" + waitForActiveShards +
", followRequest=" + followRequest +
'}';
}
}
@ -280,6 +325,15 @@ public final class PutFollowAction extends Action<PutFollowAction.Response> {
public int hashCode() {
return Objects.hash(followIndexCreated, followIndexShardsAcked, indexFollowingStarted);
}
@Override
public String toString() {
return "PutFollowAction.Response{" +
"followIndexCreated=" + followIndexCreated +
", followIndexShardsAcked=" + followIndexShardsAcked +
", indexFollowingStarted=" + indexFollowingStarted +
'}';
}
}
}

View File

@ -61,6 +61,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
// Policy with the same name must exist in follower cluster too:
putILMPolicy(policyName, "50GB", null, TimeValue.timeValueHours(7*24));
followIndex(indexName, indexName);
ensureGreen(indexName);
// Aliases are not copied from leader index, so we need to add that for the rollover action in follower cluster:
client().performRequest(new Request("PUT", "/" + indexName + "/_alias/logs"));
@ -116,6 +117,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
} else if ("follow".equals(targetCluster)) {
createNewSingletonPolicy("unfollow-only", "hot", new UnfollowAction(), TimeValue.ZERO);
followIndex(indexName, indexName);
ensureGreen(indexName);
// Create the repository before taking the snapshot.
Request request = new Request("PUT", "/_snapshot/repo");
@ -210,7 +212,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
"\"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}}, " +
"\"aliases\": {\"" + alias + "\": {\"is_write_index\": true}} }");
assertOK(leaderClient.performRequest(createIndexRequest));
// Check that the new index is creeg
// Check that the new index is created
Request checkIndexRequest = new Request("GET", "/_cluster/health/" + indexName);
checkIndexRequest.addParameter("wait_for_status", "green");
checkIndexRequest.addParameter("timeout", "70s");
@ -226,6 +228,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
index(leaderClient, indexName, "1");
assertDocumentExists(leaderClient, indexName, "1");
ensureGreen(indexName);
assertBusy(() -> {
assertDocumentExists(client(), indexName, "1");
// Sanity check that following_index setting has been set, so that we can verify later that this setting has been unset:

View File

@ -11,6 +11,13 @@
"required": true,
"description": "The name of the follower index"
}
},
"params": {
"wait_for_active_shards": {
"type" : "string",
"description" : "Sets the number of shard copies that must be active before returning. Defaults to 0. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)",
"default": "0"
}
}
},
"body": {