diff --git a/x-pack/plugin/ccr/build.gradle b/x-pack/plugin/ccr/build.gradle index 2f23b72ca8e..f3039244b6f 100644 --- a/x-pack/plugin/ccr/build.gradle +++ b/x-pack/plugin/ccr/build.gradle @@ -32,7 +32,11 @@ task internalClusterTest(type: RandomizedTestingTask, include '**/*IT.class' systemProperty 'es.set.netty.runtime.available.processors', 'false' } -check.dependsOn internalClusterTest + +check { + dependsOn = [internalClusterTest, 'qa:multi-cluster:followClusterTest'] +} + internalClusterTest.mustRunAfter test dependencies { diff --git a/x-pack/plugin/ccr/qa/build.gradle b/x-pack/plugin/ccr/qa/build.gradle new file mode 100644 index 00000000000..4a007422f38 --- /dev/null +++ b/x-pack/plugin/ccr/qa/build.gradle @@ -0,0 +1,12 @@ + +/* Remove assemble on all qa projects because we don't need to publish + * artifacts for them. */ +gradle.projectsEvaluated { + subprojects { + Task assemble = project.tasks.findByName('assemble') + if (assemble) { + project.tasks.remove(assemble) + project.build.dependsOn.remove('assemble') + } + } +} diff --git a/x-pack/plugin/ccr/qa/multi-cluster/build.gradle b/x-pack/plugin/ccr/qa/multi-cluster/build.gradle new file mode 100644 index 00000000000..444575a1538 --- /dev/null +++ b/x-pack/plugin/ccr/qa/multi-cluster/build.gradle @@ -0,0 +1,40 @@ +import org.elasticsearch.gradle.test.RestIntegTestTask + +apply plugin: 'elasticsearch.standalone-test' + +dependencies { + testCompile project(path: xpackModule('core'), configuration: 'runtime') + testCompile project(path: xpackModule('ccr'), configuration: 'runtime') +} + +task leaderClusterTest(type: RestIntegTestTask) { + mustRunAfter(precommit) +} + +leaderClusterTestCluster { + numNodes = 1 + clusterName = 'leader-cluster' + plugin xpackProject('plugin').path +} + +leaderClusterTestRunner { + systemProperty 'tests.is_leader_cluster', 'true' +} + +task followClusterTest(type: RestIntegTestTask) {} + +followClusterTestCluster { + dependsOn leaderClusterTestRunner + numNodes = 1 + clusterName = 'follow-cluster' + plugin xpackProject('plugin').path + setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\"" +} + +followClusterTestRunner { + systemProperty 'tests.is_leader_cluster', 'false' + systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" + finalizedBy 'leaderClusterTestCluster#stop' +} + +test.enabled = false // no unit tests for multi-cluster-search, only the rest integration test diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java new file mode 100644 index 00000000000..09556bf7487 --- /dev/null +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr; + +import org.apache.http.HttpHost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.test.rest.ESRestTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; + +public class FollowIndexIT extends ESRestTestCase { + + private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster")); + + @Override + protected boolean preserveClusterUponCompletion() { + return true; + } + + public void testFollowIndex() throws Exception { + final int numDocs = 128; + final String leaderIndexName = "test_index1"; + if (runningAgainstLeaderCluster) { + logger.info("Running against leader cluster"); + for (int i = 0; i < numDocs; i++) { + logger.info("Indexing doc [{}]", i); + index(client(), leaderIndexName, Integer.toString(i), "field", i); + } + refresh(leaderIndexName); + verifyDocuments(leaderIndexName, numDocs); + } else { + logger.info("Running against follow cluster"); + final String followIndexName = "test_index2"; + Settings indexSettings = Settings.builder() + .put("index.xpack.ccr.following_index", true) + .build(); + // TODO: remove mapping here when ccr syncs mappings too + createIndex(followIndexName, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"integer\" }}}"); + ensureYellow(followIndexName); + + followIndex("leader_cluster:" + leaderIndexName, followIndexName); + assertBusy(() -> verifyDocuments(followIndexName, numDocs)); + + try (RestClient leaderClient = buildLeaderClient()) { + int id = numDocs; + index(leaderClient, leaderIndexName, Integer.toString(id), "field", id); + index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1); + index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2); + } + + assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3)); + } + } + + private static void index(RestClient client, String index, String id, Object... fields) throws IOException { + XContentBuilder document = jsonBuilder().startObject(); + for (int i = 0; i < fields.length; i += 2) { + document.field((String) fields[i], fields[i + 1]); + } + document.endObject(); + assertOK(client.performRequest("POST", "/" + index + "/doc/" + id, emptyMap(), + new StringEntity(Strings.toString(document), ContentType.APPLICATION_JSON))); + } + + private static void refresh(String index) throws IOException { + assertOK(client().performRequest("POST", "/" + index + "/_refresh")); + } + + private static void followIndex(String leaderIndex, String followIndex) throws IOException { + Map params = Collections.singletonMap("leader_index", leaderIndex); + assertOK(client().performRequest("POST", "/_xpack/ccr/" + followIndex + "/_follow", params)); + } + + private static void verifyDocuments(String index, int expectedNumDocs) throws IOException { + Map params = new HashMap<>(); + params.put("size", Integer.toString(expectedNumDocs)); + params.put("sort", "field:asc"); + Map response = toMap(client().performRequest("GET", "/" + index + "/_search", params)); + + int numDocs = (int) XContentMapValues.extractValue("hits.total", response); + assertThat(numDocs, equalTo(expectedNumDocs)); + + List hits = (List) XContentMapValues.extractValue("hits.hits", response); + assertThat(hits.size(), equalTo(expectedNumDocs)); + for (int i = 0; i < expectedNumDocs; i++) { + int value = (int) XContentMapValues.extractValue("_source.field", (Map) hits.get(i)); + assertThat(i, equalTo(value)); + } + } + + private static Map toMap(Response response) throws IOException { + return toMap(EntityUtils.toString(response.getEntity())); + } + + private static Map toMap(String response) { + return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); + } + + private static void ensureYellow(String index) throws IOException { + Map params = new HashMap<>(); + params.put("wait_for_status", "yellow"); + params.put("wait_for_no_relocating_shards", "true"); + params.put("timeout", "30s"); + params.put("level", "shards"); + assertOK(client().performRequest("GET", "_cluster/health/" + index, params)); + } + + private RestClient buildLeaderClient() throws IOException { + assert runningAgainstLeaderCluster == false; + String leaderUrl = System.getProperty("tests.leader_host"); + int portSeparator = leaderUrl.lastIndexOf(':'); + HttpHost httpHost = new HttpHost(leaderUrl.substring(0, portSeparator), + Integer.parseInt(leaderUrl.substring(portSeparator + 1)), getProtocol()); + return buildClient(Settings.EMPTY, new HttpHost[]{httpHost}); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java index 2456db6f79a..44628f1da5b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowExistingIndexAction.java @@ -12,24 +12,29 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -129,7 +134,7 @@ public class FollowExistingIndexAction extends Action { - public RequestBuilder(ElasticsearchClient client, Action action) { + RequestBuilder(ElasticsearchClient client, Action action) { super(client, action, new Request()); } } @@ -141,103 +146,132 @@ public class FollowExistingIndexAction extends Action { private final Client client; + private final ClusterService clusterService; + private final RemoteClusterService remoteClusterService; private final PersistentTasksService persistentTasksService; @Inject public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, Client client, + IndexNameExpressionResolver indexNameExpressionResolver, Client client, ClusterService clusterService, PersistentTasksService persistentTasksService) { super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); this.client = client; + this.clusterService = clusterService; + this.remoteClusterService = transportService.getRemoteClusterService(); this.persistentTasksService = persistentTasksService; } - @Override protected void doExecute(Request request, ActionListener listener) { - client.admin().cluster().state(new ClusterStateRequest(), new ActionListener() { - @Override - public void onResponse(ClusterStateResponse clusterStateResponse) { - IndexMetaData leaderIndexMetadata = clusterStateResponse.getState().getMetaData() - .index(request.leaderIndex); - if (leaderIndexMetadata == null) { - listener.onFailure(new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist")); - return; - } + ClusterState localClusterState = clusterService.state(); + IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followIndex); - IndexMetaData followIndexMetadata = clusterStateResponse.getState().getMetaData() - .index(request.followIndex); - if (followIndexMetadata == null) { - listener.onFailure(new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist")); - return; - } + String[] indices = new String[]{request.getLeaderIndex()}; + Map> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false); + if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + // Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData: + IndexMetaData leaderIndexMetadata = localClusterState.getMetaData().index(request.leaderIndex); + start(request, null, leaderIndexMetadata, followIndexMetadata, listener); + } else { + // Following an index in remote cluster, so use remote client to fetch leader IndexMetaData: + assert remoteClusterIndices.size() == 1; + Map.Entry> entry = remoteClusterIndices.entrySet().iterator().next(); + assert entry.getValue().size() == 1; + String clusterNameAlias = entry.getKey(); + String leaderIndex = entry.getValue().get(0); - if (leaderIndexMetadata.getNumberOfShards() != followIndexMetadata.getNumberOfShards()) { - listener.onFailure(new IllegalArgumentException("leader index primary shards [" + - leaderIndexMetadata.getNumberOfShards() + "] does not match with the number of " + - "shards of the follow index [" + followIndexMetadata.getNumberOfShards() + "]")); - return; - } + Client remoteClient = client.getRemoteClusterClient(clusterNameAlias); + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(leaderIndex); + remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(r -> { + ClusterState remoteClusterState = r.getState(); + IndexMetaData leaderIndexMetadata = remoteClusterState.getMetaData().index(leaderIndex); + start(request, clusterNameAlias, leaderIndexMetadata, followIndexMetadata, listener); + }, listener::onFailure)); + } + } - // TODO: other validation checks + /** + * Performs validation on the provided leader and follow {@link IndexMetaData} instances and then + * creates a persistent task for each leader primary shard. This persistent tasks track changes in the leader + * shard and replicate these changes to a follower shard. + * + * Currently the following validation is performed: + *
    + *
  • The leader index and follow index need to have the same number of primary shards
  • + *
+ */ + void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata, + ActionListener handler) { + if (leaderIndexMetadata == null) { + handler.onFailure(new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist")); + return; + } - final int numShards = followIndexMetadata.getNumberOfShards(); - final AtomicInteger counter = new AtomicInteger(numShards); - final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); - for (int i = 0; i < numShards; i++) { - final int shardId = i; - String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; - ShardFollowTask shardFollowTask = new ShardFollowTask(new ShardId(followIndexMetadata.getIndex(), shardId), - new ShardId(leaderIndexMetadata.getIndex(), shardId), request.batchSize, request.concurrentProcessors, - request.processorMaxTranslogBytes); - persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask, - new ActionListener>() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { - responses.set(shardId, task); - finalizeResponse(); - } + if (followIndexMetadata == null) { + handler.onFailure(new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist")); + return; + } - @Override - public void onFailure(Exception e) { - responses.set(shardId, e); - finalizeResponse(); - } + if (leaderIndexMetadata.getNumberOfShards() != followIndexMetadata.getNumberOfShards()) { + handler.onFailure(new IllegalArgumentException("leader index primary shards [" + + leaderIndexMetadata.getNumberOfShards() + "] does not match with the number of " + + "shards of the follow index [" + followIndexMetadata.getNumberOfShards() + "]")); + // TODO: other validation checks + } else { + final int numShards = followIndexMetadata.getNumberOfShards(); + final AtomicInteger counter = new AtomicInteger(numShards); + final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); + for (int i = 0; i < numShards; i++) { + final int shardId = i; + String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; + ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, + new ShardId(followIndexMetadata.getIndex(), shardId), + new ShardId(leaderIndexMetadata.getIndex(), shardId), + request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes); + persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask, + new ActionListener>() { + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { + responses.set(shardId, task); + finalizeResponse(); + } - void finalizeResponse() { - Exception error = null; - if (counter.decrementAndGet() == 0) { - for (int j = 0; j < responses.length(); j++) { - Object response = responses.get(j); - if (response instanceof Exception) { - if (error == null) { - error = (Exception) response; - } else { - error.addSuppressed((Throwable) response); + @Override + public void onFailure(Exception e) { + responses.set(shardId, e); + finalizeResponse(); + } + + void finalizeResponse() { + Exception error = null; + if (counter.decrementAndGet() == 0) { + for (int j = 0; j < responses.length(); j++) { + Object response = responses.get(j); + if (response instanceof Exception) { + if (error == null) { + error = (Exception) response; + } else { + error.addSuppressed((Throwable) response); + } } } - } - if (error == null) { - // include task ids? - listener.onResponse(new Response()); - } else { - // TODO: cancel all started tasks - listener.onFailure(error); + if (error == null) { + // include task ids? + handler.onResponse(new Response()); + } else { + // TODO: cancel all started tasks + handler.onFailure(error); + } } } } - - }); - } + ); } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + } } } - } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index e153bb2ec56..09202c925ba 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -24,6 +24,7 @@ public class ShardFollowTask implements PersistentTaskParams { public static final String NAME = "shard_follow"; + static final ParseField LEADER_CLUSTER_ALIAS_FIELD = new ParseField("leader_cluster_alias"); static final ParseField FOLLOW_SHARD_INDEX_FIELD = new ParseField("follow_shard_index"); static final ParseField FOLLOW_SHARD_INDEX_UUID_FIELD = new ParseField("follow_shard_index_uuid"); static final ParseField FOLLOW_SHARD_SHARDID_FIELD = new ParseField("follow_shard_shard"); @@ -35,10 +36,11 @@ public class ShardFollowTask implements PersistentTaskParams { public static final ParseField PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST = new ParseField("processor_max_translog_bytes"); public static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, - (a) -> new ShardFollowTask(new ShardId((String) a[0], (String) a[1], (int) a[2]), - new ShardId((String) a[3], (String) a[4], (int) a[5]), (long) a[6], (int) a[7], (long) a[8])); + (a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]), + new ShardId((String) a[4], (String) a[5], (int) a[6]), (long) a[7], (int) a[8], (long) a[9])); static { + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_UUID_FIELD); PARSER.declareInt(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_SHARDID_FIELD); @@ -50,14 +52,16 @@ public class ShardFollowTask implements PersistentTaskParams { PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST); } + private final String leaderClusterAlias; private final ShardId followShardId; private final ShardId leaderShardId; private final long maxChunkSize; private final int numConcurrentChunks; private final long processorMaxTranslogBytes; - ShardFollowTask(ShardId followShardId, ShardId leaderShardId, long maxChunkSize, int numConcurrentChunks, - long processorMaxTranslogBytes) { + ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, long maxChunkSize, + int numConcurrentChunks, long processorMaxTranslogBytes) { + this.leaderClusterAlias = leaderClusterAlias; this.followShardId = followShardId; this.leaderShardId = leaderShardId; this.maxChunkSize = maxChunkSize; @@ -66,6 +70,7 @@ public class ShardFollowTask implements PersistentTaskParams { } public ShardFollowTask(StreamInput in) throws IOException { + this.leaderClusterAlias = in.readOptionalString(); this.followShardId = ShardId.readShardId(in); this.leaderShardId = ShardId.readShardId(in); this.maxChunkSize = in.readVLong(); @@ -73,6 +78,10 @@ public class ShardFollowTask implements PersistentTaskParams { this.processorMaxTranslogBytes = in.readVLong(); } + public String getLeaderClusterAlias() { + return leaderClusterAlias; + } + public ShardId getFollowShardId() { return followShardId; } @@ -100,6 +109,7 @@ public class ShardFollowTask implements PersistentTaskParams { @Override public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(leaderClusterAlias); followShardId.writeTo(out); leaderShardId.writeTo(out); out.writeVLong(maxChunkSize); @@ -114,6 +124,9 @@ public class ShardFollowTask implements PersistentTaskParams { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + if (leaderClusterAlias != null) { + builder.field(LEADER_CLUSTER_ALIAS_FIELD.getPreferredName(), leaderClusterAlias); + } builder.field(FOLLOW_SHARD_INDEX_FIELD.getPreferredName(), followShardId.getIndex().getName()); builder.field(FOLLOW_SHARD_INDEX_UUID_FIELD.getPreferredName(), followShardId.getIndex().getUUID()); builder.field(FOLLOW_SHARD_SHARDID_FIELD.getPreferredName(), followShardId.id()); @@ -131,7 +144,8 @@ public class ShardFollowTask implements PersistentTaskParams { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ShardFollowTask that = (ShardFollowTask) o; - return Objects.equals(followShardId, that.followShardId) && + return Objects.equals(leaderClusterAlias, that.leaderClusterAlias) && + Objects.equals(followShardId, that.followShardId) && Objects.equals(leaderShardId, that.leaderShardId) && maxChunkSize == that.maxChunkSize && numConcurrentChunks == that.numConcurrentChunks && @@ -140,7 +154,8 @@ public class ShardFollowTask implements PersistentTaskParams { @Override public int hashCode() { - return Objects.hash(followShardId, leaderShardId, maxChunkSize, numConcurrentChunks, processorMaxTranslogBytes); + return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxChunkSize, numConcurrentChunks, + processorMaxTranslogBytes); } public String toString() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 2cfc836363a..9f62b014a18 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -64,12 +64,16 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor prepare(shardFollowNodeTask, params, followGlobalCheckPoint), task::markAsFailed); + fetchGlobalCheckpoint(client, params.getFollowShardId(), + followGlobalCheckPoint -> prepare(leaderClient, shardFollowNodeTask, params, followGlobalCheckPoint), task::markAsFailed); } - void prepare(ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) { + void prepare(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) { if (task.getState() != AllocatedPersistentTask.State.STARTED) { // TODO: need better cancellation control return; @@ -98,10 +104,10 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { + fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> { // TODO: check if both indices have the same history uuid if (leaderGlobalCheckPoint == followGlobalCheckPoint) { - retry(task, params, followGlobalCheckPoint); + retry(leaderClient, task, params, followGlobalCheckPoint); } else { assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint + "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]"; @@ -109,12 +115,12 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor handler = e -> { if (e == null) { task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint); - prepare(task, params, leaderGlobalCheckPoint); + prepare(leaderClient, task, params, leaderGlobalCheckPoint); } else { task.markAsFailed(e); } }; - ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, params.getMaxChunkSize(), + ChunksCoordinator coordinator = new ChunksCoordinator(client, leaderClient, ccrExecutor, params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, handler); coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint); coordinator.start(); @@ -122,7 +128,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor errorHandler) { + private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer errorHandler) { client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> { IndexStats indexStats = r.getIndex(shardId.getIndexName()); Optional filteredShardStats = Arrays.stream(indexStats.getShards()) @@ -158,7 +164,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor chunks = new ConcurrentLinkedQueue<>(); private final AtomicReference failureHolder = new AtomicReference<>(); - ChunksCoordinator(Client client, Executor ccrExecutor, long batchSize, int concurrentProcessors, + ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, long batchSize, int concurrentProcessors, long processorMaxTranslogBytes, ShardId leaderShard, ShardId followerShard, Consumer handler) { - this.client = client; + this.followerClient = followerClient; + this.leaderClient = leaderClient; this.ccrExecutor = ccrExecutor; this.batchSize = batchSize; this.concurrentProcessors = concurrentProcessors; @@ -220,7 +228,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { + Consumer processorHandler = e -> { if (e == null) { LOGGER.debug("{} Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); processNextChunk(); @@ -229,7 +237,9 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor chunks; private final Executor ccrExecutor; @@ -260,9 +271,10 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor handler; final AtomicInteger retryCounter = new AtomicInteger(0); - ChunkProcessor(Client client, Queue chunks, Executor ccrExecutor, ShardId leaderShard, + ChunkProcessor(Client leaderClient, Client followerClient, Queue chunks, Executor ccrExecutor, ShardId leaderShard, ShardId followerShard, Consumer handler) { - this.client = client; + this.leaderClient = leaderClient; + this.followerClient = followerClient; this.chunks = chunks; this.ccrExecutor = ccrExecutor; this.leaderShard = leaderShard; @@ -275,7 +287,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor() { + leaderClient.execute(ShardChangesAction.INSTANCE, request, new ActionListener() { @Override public void onResponse(ShardChangesAction.Response response) { handleResponse(to, response); @@ -317,7 +329,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor() { + followerClient.execute(BulkShardOperationsAction.INSTANCE, request, new ActionListener() { @Override public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) { handler.accept(null); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java index 5eb808fa489..937e62f41bf 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java @@ -49,7 +49,7 @@ public class ChunksCoordinatorTests extends ESTestCase { ShardId followShardId = new ShardId("index2", "index1", 0); ChunksCoordinator coordinator = - new ChunksCoordinator(client, ccrExecutor, 1024, 1, Long.MAX_VALUE, leaderShardId, followShardId, e -> {}); + new ChunksCoordinator(client, client, ccrExecutor, 1024, 1, Long.MAX_VALUE, leaderShardId, followShardId, e -> {}); coordinator.createChucks(0, 1024); List result = new ArrayList<>(coordinator.getChunks()); assertThat(result.size(), equalTo(1)); @@ -105,7 +105,7 @@ public class ChunksCoordinatorTests extends ESTestCase { Consumer handler = e -> assertThat(e, nullValue()); int concurrentProcessors = randomIntBetween(1, 4); int batchSize = randomIntBetween(1, 1000); - ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, batchSize, concurrentProcessors, + ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, batchSize, concurrentProcessors, Long.MAX_VALUE, leaderShardId, followShardId, handler); int numberOfOps = randomIntBetween(batchSize, batchSize * 20); @@ -149,7 +149,7 @@ public class ChunksCoordinatorTests extends ESTestCase { assertThat(e, sameInstance(expectedException)); }; ChunksCoordinator coordinator = - new ChunksCoordinator(client, ccrExecutor, 10, 1, Long.MAX_VALUE, leaderShardId, followShardId, handler); + new ChunksCoordinator(client, client, ccrExecutor, 10, 1, Long.MAX_VALUE, leaderShardId, followShardId, handler); coordinator.createChucks(0, 20); assertThat(coordinator.getChunks().size(), equalTo(2)); @@ -180,7 +180,7 @@ public class ChunksCoordinatorTests extends ESTestCase { latch.countDown(); }; ChunksCoordinator coordinator = - new ChunksCoordinator(client, ccrExecutor, 1000, 4, Long.MAX_VALUE, leaderShardId, followShardId, handler); + new ChunksCoordinator(client, client, ccrExecutor, 1000, 4, Long.MAX_VALUE, leaderShardId, followShardId, handler); coordinator.createChucks(0, 1000000); assertThat(coordinator.getChunks().size(), equalTo(1000)); @@ -206,7 +206,7 @@ public class ChunksCoordinatorTests extends ESTestCase { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, chunks, ccrExecutor, leaderShardId, followShardId, handler); + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler); chunkProcessor.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], nullValue()); @@ -226,7 +226,7 @@ public class ChunksCoordinatorTests extends ESTestCase { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, chunks, ccrExecutor, leaderShardId, followShardId, handler); + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler); chunkProcessor.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], nullValue()); @@ -247,7 +247,7 @@ public class ChunksCoordinatorTests extends ESTestCase { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, chunks, ccrExecutor, leaderShardId, followShardId, handler); + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler); chunkProcessor.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], notNullValue()); @@ -269,7 +269,7 @@ public class ChunksCoordinatorTests extends ESTestCase { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, chunks, ccrExecutor, leaderShardId, followShardId, handler); + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler); chunkProcessor.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], notNullValue()); @@ -305,7 +305,7 @@ public class ChunksCoordinatorTests extends ESTestCase { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, chunks, ccrExecutor, leaderShardId, followShardId, handler); + ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler); chunkProcessor.start(from, to, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], nullValue()); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java index d92caed93b1..148f3acd209 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -22,6 +22,7 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase